This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a629f1cf6f [INLONG-8252][DataProxy] Adjust default Topic settings from
Source to Sink (#8259)
a629f1cf6f is described below
commit a629f1cf6feedbe548bffe3b3cfab3660c8ec763
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jun 15 19:46:04 2023 +0800
[INLONG-8252][DataProxy] Adjust default Topic settings from Source to Sink
(#8259)
---
.../dataproxy/config/CommonConfigHolder.java | 52 ++++++++++++++---
.../dataproxy/config/holder/MetaConfigHolder.java | 7 ++-
.../inlong/dataproxy/consts/StatConstants.java | 23 +++-----
.../inlong/dataproxy/http/MessageFilter.java | 2 +-
.../dataproxy/http/SimpleMessageHandler.java | 13 ++---
.../inlong/dataproxy/sink/common/SinkContext.java | 41 +++++++++-----
.../sink/mq/MessageQueueZoneSinkContext.java | 4 +-
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 65 ++++++++++++++--------
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 61 ++++++++++++--------
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 49 +++++++++-------
.../dataproxy/source/ServerMessageHandler.java | 4 +-
.../inlong/dataproxy/source2/BaseSource.java | 9 ---
.../dataproxy/source2/InLongMessageFactory.java | 2 +-
.../dataproxy/source2/InLongMessageHandler.java | 13 ++---
.../inlong/dataproxy/source2/SourceConstants.java | 2 -
.../dataproxy/source2/v0msg/CodecBinMsg.java | 17 ++----
.../dataproxy/source2/v0msg/CodecTextMsg.java | 38 ++++++++-----
.../config/holder/TestCommonConfigHolder.java | 8 +--
.../src/test/resources/common.properties | 5 ++
19 files changed, 243 insertions(+), 172 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 81ba91e4df..d6e246978a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -78,8 +81,11 @@ public class CommonConfigHolder {
"startup.using.local.meta.file.enable";
public static final boolean VAL_DEF_ENABLE_STARTUP_USING_LOCAL_META_FILE =
false;
// whether to accept messages without mapping between groupId/streamId and
topic
- public static final String KEY_NOTFOUND_TOPIC_ACCEPT =
"source.topic.notfound.accept";
- public static final boolean VAL_DEF_NOTFOUND_TOPIC_ACCEPT = false;
+ public static final String KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
"id2topic.unconfigured.accept.enable";
+ public static final boolean VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT =
false;
+ // default topics configure key, multiple topic settings are separated by
"\\s+".
+ public static final String KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS =
"id2topic.unconfigured.default.topics";
+ public static final String VAL_DEFAULT_TOPIC = "test";
// whether enable whitelist, optional field.
public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
@@ -161,7 +167,8 @@ public class CommonConfigHolder {
private long auditFormatInvlMs = VAL_DEF_AUDIT_FORMAT_INTERVAL_MS;
private boolean responseAfterSave = VAL_DEF_RESPONSE_AFTER_SAVE;
private long maxResAfterSaveTimeout = VAL_DEF_MAX_RAS_TIMEOUT_MS;
- private boolean noTopicAccept = VAL_DEF_NOTFOUND_TOPIC_ACCEPT;
+ private boolean enableUnConfigTopicAccept =
VAL_DEF_ENABLE_UNCONFIGURED_TOPIC_ACCEPT;
+ private List<String> defaultTopics = Arrays.asList(VAL_DEFAULT_TOPIC);
private boolean enableWhiteList = VAL_DEF_ENABLE_WHITELIST;
private int maxBufferQueueSizeKb = VAL_DEF_MAX_BUFFERQUEUE_SIZE_KB;
private String eventHandler = VAL_DEF_EVENT_HANDLER;
@@ -239,8 +246,20 @@ public class CommonConfigHolder {
return metaConfigSyncInvlMs;
}
- public boolean isNoTopicAccept() {
- return noTopicAccept;
+ public boolean isEnableUnConfigTopicAccept() {
+ return enableUnConfigTopicAccept;
+ }
+
+ public List<String> getDefTopics() {
+ return defaultTopics;
+ }
+
+ public String getRandDefTopics() {
+ if (defaultTopics.isEmpty()) {
+ return null;
+ }
+ SecureRandom rand = new SecureRandom();
+ return defaultTopics.get(rand.nextInt(defaultTopics.size()));
}
public boolean isEnableWhiteList() {
@@ -380,10 +399,27 @@ public class CommonConfigHolder {
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableStartupUsingLocalMetaFile =
"TRUE".equalsIgnoreCase(tmpValue.trim());
}
- // read whether accept msg without topic
- tmpValue = this.props.get(KEY_NOTFOUND_TOPIC_ACCEPT);
+ // read whether accept msg without id2topic configure
+ tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
if (StringUtils.isNotEmpty(tmpValue)) {
- this.noTopicAccept = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ this.enableUnConfigTopicAccept =
"TRUE".equalsIgnoreCase(tmpValue.trim());
+ if (enableUnConfigTopicAccept) {
+ // read default topics
+ tmpValue =
this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ List<String> tmpList = new ArrayList<>();
+ String[] topicItems = tmpValue.split("\\s+");
+ for (String item : topicItems) {
+ if (StringUtils.isBlank(item)) {
+ continue;
+ }
+ tmpList.add(item.trim());
+ }
+ if (tmpList.size() > 0) {
+ defaultTopics = tmpList;
+ }
+ }
+ }
}
// read enable whitelist
tmpValue = this.props.get(KEY_ENABLE_WHITELIST);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 70a07bde2c..6d0da8d5ea 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -177,7 +177,12 @@ public class MetaConfigHolder extends ConfigHolder {
}
public Set<String> getAllTopicName() {
- Set<String> result = new HashSet<>(defTopics);
+ Set<String> result = new HashSet<>();
+ // add default topics first
+ if (CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ result.addAll(CommonConfigHolder.getInstance().getDefTopics());
+ }
+ // add configured topics
for (IdTopicConfig topicConfig : id2TopicMap.values()) {
if (topicConfig == null) {
continue;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 48e087c10c..973f01451f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -57,6 +57,7 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_BODY_NEGATIVE =
"msg.body.negative";
public static final java.lang.String EVENT_MSG_BODY_UNPRESS_EXP =
"msg.body.unpress.exp";
public static final java.lang.String EVENT_MSG_BODY_OVERMAX =
"msg.body.overmax";
+ public static final java.lang.String EVENT_MSG_BODY_TRIP = "msg.body.trip";
public static final java.lang.String EVENT_MSG_ATTR_NEGATIVE =
"msg.attr.negative";
public static final java.lang.String EVENT_MSG_MAGIC_UNEQUAL =
"msg.magic.unequal";
public static final java.lang.String EVENT_MSG_HB_TOTALLEN_BELOWMIN =
"msg.hb.totallen.belowmin";
@@ -76,26 +77,16 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_V0_POST_FAILURE =
"msg.post.v0.failure";
public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS =
"msg.post.v1.success";
public static final java.lang.String EVENT_MSG_V1_POST_DROPPED =
"msg.post.v1.dropped";
+ // sink
+ public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING =
"sink.topic.missing";
+ public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING =
"default.topic.empty";
+ public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED =
"default.topic.used";
+ public static final java.lang.String EVENT_SINK_PRODUCER_NULL =
"sink.producer.null";
+ public static final java.lang.String EVENT_SINK_SEND_EXCEPTION =
"sink.send.exception";
- public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
- public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
- public static final java.lang.String EVENT_SINK_NOPRODUCER =
"sink.noproducer";
- public static final java.lang.String EVENT_SINK_SENDEXCEPT =
"sink.sendexcept";
public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
public static final java.lang.String EVENT_SINK_FAILDROPPED =
"sink.dropped";
public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT =
"sink.rcvexcept";
- public static final java.lang.String METASINK_OTHEREXP =
"metasink.otherexp";
- public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
- public static final java.lang.String METASINK_MSG_NOTOPIC =
"metasink.msgnotopic";
- public static final java.lang.String METASINK_PROCESS_SPEED =
"metasink.process.speed";
- public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
- public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
-
- public static final java.lang.String AGENT_MESSAGES_SENT_SUCCESS =
"agent.messages.success";
- public static final java.lang.String AGENT_PACKAGES_SENT_SUCCESS =
"agent.packages.success";
- public static final java.lang.String MSG_COUNTER_KEY = "msgcnt";
- public static final java.lang.String MSG_PKG_TIME_KEY = "msg.pkg.time";
- public static final java.lang.String AGENT_MESSAGES_COUNT_PREFIX =
"agent.messages.count.";
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
index 8c65690faa..e2272cd5c0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/MessageFilter.java
@@ -106,7 +106,7 @@ public class MessageFilter implements Filter {
// get and check topicName
String topicName = ConfigManager.getInstance().getTopicName(groupId,
streamId);
if (StringUtils.isBlank(topicName)
- && !CommonConfigHolder.getInstance().isNoTopicAccept()) {
+ &&
!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
returnRspPackage(resp, req.getCharacterEncoding(),
DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
DataProxyErrCode.TOPIC_IS_BLANK.getErrMsg());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 59a7655338..e2eba723a3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -22,7 +22,6 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.NetworkUtils;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
@@ -103,13 +102,9 @@ public class SimpleMessageHandler implements
MessageHandler {
// get topicName
String topicName = configManager.getTopicName(groupId, streamId);
if (StringUtils.isBlank(topicName)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
- topicName = "test";
- } else {
- throw new MessageProcessException(strBuff
- .append("Topic for message is null, inlongGroupId = ")
- .append(groupId).append(", inlongStreamId =
").append(streamId).toString());
- }
+ throw new MessageProcessException(strBuff
+ .append("Topic for message is null, inlongGroupId = ")
+ .append(groupId).append(", inlongStreamId =
").append(streamId).toString());
}
// get message data time
final long msgRcvTime = System.currentTimeMillis();
@@ -171,7 +166,7 @@ public class SimpleMessageHandler implements MessageHandler
{
// build metric data item
longDataTime = longDataTime / 1000 / 60 / 10;
longDataTime = longDataTime * 1000 * 60 * 10;
-
strBuff.append("http").append(SEP_HASHTAG).append(topicName).append(SEP_HASHTAG)
+
strBuff.append("http").append(SEP_HASHTAG).append(groupId).append(SEP_HASHTAG)
.append(streamId).append(SEP_HASHTAG).append(strRemoteIP).append(SEP_HASHTAG)
.append(NetworkUtils.getLocalIp()).append(SEP_HASHTAG)
.append(evenProcType.getRight()).append(SEP_HASHTAG)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 4ebe570975..102c88a37d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -18,12 +18,12 @@
package org.apache.inlong.dataproxy.sink.common;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler;
@@ -65,7 +65,7 @@ public class SinkContext {
protected Timer reloadTimer;
// file metric statistic
protected MonitorIndex monitorIndex = null;
- private MonitorIndexExt monitorIndexExt = null;
+ private MonitorStats monitorStats = null;
/**
* Constructor
@@ -90,13 +90,15 @@ public class SinkContext {
// init monitor logic
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
-
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
- this.monitorIndexExt = new MonitorIndexExt(
+ this.monitorStats = new MonitorStats(
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+ AttrConstants.SEP_HASHTAG + this.getSinkName(),
-
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+ this.monitorIndex.start();
+ this.monitorStats.start();
}
try {
this.reload();
@@ -118,23 +120,36 @@ public class SinkContext {
// stop file statistic index
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
if (monitorIndex != null) {
- monitorIndex.shutDown();
+ monitorIndex.stop();
}
- if (monitorIndexExt != null) {
- monitorIndexExt.shutDown();
+ if (monitorStats != null) {
+ monitorStats.stop();
}
}
}
- public void fileMetricEventInc(String eventKey) {
+ public void fileMetricIncSumStats(String eventKey) {
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
- monitorIndexExt.incrementAndGet(eventKey);
+ monitorStats.incSumStats(eventKey);
}
}
- public void fileMetricRecordAdd(String key, int cnt, int packCnt, long
packSize, int failCnt) {
+ public void fileMetricIncWithDetailStats(String eventKey, String
detailInfoKey) {
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
- monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+ monitorStats.incSumStats(eventKey);
+ monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
+ }
+ }
+
+ public void fileMetricAddSuccCnt(String key, int cnt, int packCnt, long
packSize) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorIndex.addSuccStats(key, cnt, packCnt, packSize);
+ }
+ }
+
+ public void fileMetricAddFailCnt(String key, int failCnt) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorIndex.addFailStats(key, failCnt);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 151b812ff0..c395af814d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -242,11 +242,11 @@ public class MessageQueueZoneSinkContext extends
SinkContext {
DataProxyErrCode errCode, String errMsg) {
if (currentRecord.isResend()) {
dispatchQueue.offer(currentRecord);
- fileMetricEventInc(StatConstants.EVENT_SINK_FAILRETRY);
+ fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILRETRY);
this.addSendResultMetric(currentRecord, mqName, topic, false,
sendTime);
} else {
currentRecord.fail(errCode, errMsg);
- fileMetricEventInc(StatConstants.EVENT_SINK_FAILDROPPED);
+ fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILDROPPED);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index b8eef4f7ca..86e0fddefc 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -18,6 +18,8 @@
package org.apache.inlong.dataproxy.sink.mq.kafka;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -29,6 +31,7 @@ import
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -50,7 +53,8 @@ import java.util.Set;
public class KafkaHandler implements MessageQueueHandler {
public static final Logger LOG =
LoggerFactory.getLogger(KafkaHandler.class);
- public static final String KEY_NAMESPACE = "namespace";
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
private CacheClusterConfig config;
private String clusterName;
@@ -115,27 +119,34 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public boolean send(PackProfile profile) {
try {
- // idConfig
+ String topic;
+ // get idConfig
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
- String topic = idConfig.getTopicName();
- if (topic == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
- profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
- return false;
+ if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ topic = CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(topic)) {
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+ } else {
+ topic = idConfig.getTopicName();
}
// create producer failed
if (producer == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
sinkContext.processSendFail(profile, clusterName, topic, 0,
DataProxyErrCode.PRODUCER_IS_NULL, "");
return false;
}
@@ -147,10 +158,12 @@ public class KafkaHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send Message to Kafka failure", ex);
+ }
return false;
}
}
@@ -188,12 +201,14 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(batchProfile, clusterName,
topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
- LOG.error("Send BatchPackProfile to Kafka failure", ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send BatchPackProfile to Kafka failure",
ex);
+ }
} else {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName,
topic, true, sendTime);
sinkContext.getDispatchQueue().release(batchProfile.getSize());
batchProfile.ack();
@@ -230,12 +245,14 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(simpleProfile, clusterName,
topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
- LOG.error("Send SimplePackProfile to Kafka failure", ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send SimplePackProfile to Kafka failure",
ex);
+ }
} else {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile,
clusterName, topic, true, sendTime);
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
simpleProfile.ack();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 0765197431..7800e615fa 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq.pulsar;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -203,24 +204,30 @@ public class PulsarHandler implements MessageQueueHandler
{
@Override
public boolean send(PackProfile profile) {
try {
- // idConfig
+ String producerTopic;
+ // get idConfig
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
- // topic
- String producerTopic = idConfig.getPulsarTopicName(tenant,
namespace);
- if (producerTopic == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
- profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
- return false;
+ if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ producerTopic =
CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(producerTopic)) {
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+ } else {
+ producerTopic = idConfig.getPulsarTopicName(tenant, namespace);
}
// get producer
Producer<byte[]> producer = this.producerMap.get(producerTopic);
@@ -246,7 +253,7 @@ public class PulsarHandler implements MessageQueueHandler {
}
// create producer failed
if (producer == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
sinkContext.processSendFail(profile, clusterName,
producerTopic, 0,
DataProxyErrCode.PRODUCER_IS_NULL, "");
return false;
@@ -259,10 +266,12 @@ public class PulsarHandler implements MessageQueueHandler
{
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send Message to Pulsar failure", ex);
+ }
return false;
}
}
@@ -314,12 +323,14 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(batchProfile, clusterName,
producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
- LOG.error("Send ProfileV1 to Pulsar failure", ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send ProfileV1 to Pulsar failure", ex);
+ }
} else {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(batchProfile.getSize());
batchProfile.ack();
@@ -346,12 +357,14 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(simpleProfile, clusterName,
producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
- LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
+ }
} else {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
simpleProfile.ack();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 0f6b4fcce1..0b5ee8cffa 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.mq.tube;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -39,6 +40,7 @@ import
org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +59,6 @@ public class TubeHandler implements MessageQueueHandler {
private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
- public static final String KEY_NAMESPACE = "namespace";
private CacheClusterConfig config;
private String clusterName;
@@ -183,29 +184,35 @@ public class TubeHandler implements MessageQueueHandler {
public boolean send(PackProfile profile) {
try {
// idConfig
+ String topic;
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
- sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
-
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
- return false;
- }
- String topic = idConfig.getTopicName();
- if (topic == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
- sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
- sinkContext.getDispatchQueue().release(profile.getSize());
- profile.fail(DataProxyErrCode.TOPIC_IS_BLANK, "");
- return false;
+ if
(!CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_CONFIG_TOPIC_MISSING,
profile.getUid());
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+ topic = CommonConfigHolder.getInstance().getRandDefTopics();
+ if (StringUtils.isEmpty(topic)) {
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_MISSING);
+ sinkContext.addSendResultMetric(profile, clusterName,
profile.getUid(), false, 0);
+ sinkContext.getDispatchQueue().release(profile.getSize());
+
profile.fail(DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE, "");
+ return false;
+ }
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_DEFAULT_TOPIC_USED);
+ } else {
+ topic = idConfig.getTopicName();
}
// create producer failed
if (producer == null) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_PRODUCER_NULL);
sinkContext.processSendFail(profile, clusterName, topic, 0,
DataProxyErrCode.PRODUCER_IS_NULL, "");
- LOG.error("producer is null");
return false;
}
// publish
@@ -221,7 +228,7 @@ public class TubeHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
LOG.error(ex.getMessage(), ex);
@@ -258,7 +265,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(batchProfile, clusterName,
topic, true, sendTime);
sinkContext.getDispatchQueue().release(batchProfile.getSize());
batchProfile.ack();
@@ -266,7 +273,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(batchProfile, clusterName, topic,
sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -292,7 +299,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName,
topic, true, sendTime);
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
simpleProfile.ack();
@@ -300,7 +307,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
-
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(simpleProfile, clusterName, topic,
sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index ab1d77b53e..5669e8de08 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -381,7 +381,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
}
// check topic configure
if (StringUtils.isEmpty(configTopic)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
+ if
(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept()) {
configTopic = this.defaultTopic;
} else {
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
@@ -518,7 +518,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
longDataTime = longDataTime / 1000 / 60 / 10;
longDataTime = longDataTime * 1000 * 60 * 10;
strBuff.append(protocolType).append(AttrConstants.SEPARATOR)
-
.append(topicEntry.getKey()).append(AttrConstants.SEPARATOR)
+ .append(groupId).append(AttrConstants.SEPARATOR)
.append(streamIdEntry.getKey()).append(AttrConstants.SEPARATOR)
.append(strRemoteIP).append(AttrConstants.SEPARATOR)
.append(getLocalIp()).append(AttrConstants.SEPARATOR)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index 0cf601495f..6800fd01cf 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -153,11 +153,6 @@ public abstract class BaseSource
Preconditions.checkArgument(StringUtils.isNotBlank(tmpVal),
SourceConstants.SRCCXT_MESSAGE_HANDLER_NAME + " config is
blank");
this.messageHandlerName = tmpVal;
- // get default topic
- tmpVal = context.getString(SourceConstants.SRCCXT_DEF_TOPIC);
- if (StringUtils.isNotBlank(tmpVal)) {
- this.defTopic = tmpVal.trim();
- }
// get default attributes
tmpVal = context.getString(SourceConstants.SRCCXT_DEF_ATTR);
if (StringUtils.isNotBlank(tmpVal)) {
@@ -367,10 +362,6 @@ public abstract class BaseSource
return strPort;
}
- public String getDefTopic() {
- return defTopic;
- }
-
public String getDefAttr() {
return defAttr;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
index 0dc9b326d7..605dfd8872 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageFactory.java
@@ -34,7 +34,7 @@ public class InLongMessageFactory extends
ChannelInitializer<SocketChannel> {
public static final int INLONG_LENGTH_FIELD_OFFSET = 0;
public static final int INLONG_LENGTH_FIELD_LENGTH = 4;
- public static final int INLONG_LENGTH_ADJUSTMENT = -4;
+ public static final int INLONG_LENGTH_ADJUSTMENT = 0;
public static final int INLONG_INITIAL_BYTES_TO_STRIP = 0;
public static final boolean DEFAULT_FAIL_FAST = true;
private static final Logger LOG =
LoggerFactory.getLogger(InLongMessageFactory.class);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index 9fb5144aa2..a4d094872b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -75,7 +75,6 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
private static final int INLONG_MSG_V1 = 1;
- private static final String DEFAULT_REMOTE_IDC_VALUE = "0";
private static final ConfigManager configManager =
ConfigManager.getInstance();
private final BaseSource source;
@@ -421,14 +420,10 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
// get configured topic name
String topic =
configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
if (StringUtils.isBlank(topic)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
- topic = source.getDefTopic();
- } else {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
- source.addMetric(false, event.getBody().length, event);
- this.responsePackage(ctx,
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
- return;
- }
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ source.addMetric(false, event.getBody().length, event);
+ this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR,
packObject);
+ return;
}
event.setTopic(topic);
// put to channel
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 8cb0e018ce..c47e6d5cac 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -33,8 +33,6 @@ public class SourceConstants {
public static final String SRCCXT_MSG_FACTORY_NAME = "msg-factory-name";
// message handler name
public static final String SRCCXT_MESSAGE_HANDLER_NAME =
"message-handler-name";
- // default topic name
- public static final String SRCCXT_DEF_TOPIC = "topic";
// default attributes
public static final String SRCCXT_DEF_ATTR = "attr";
// max message length
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 4fe96f57ec..d5dd70c7a4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -22,7 +22,6 @@ import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
@@ -149,7 +148,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
return false;
}
// build message seqId
- this.msgSeqId = strBuff.append(this.topicName)
+ this.msgSeqId = strBuff.append(this.groupId)
.append(AttributeConstants.SEPARATOR).append(this.streamId)
.append(AttributeConstants.SEPARATOR).append(strRemoteIP)
.append("#").append(dataTimeMs).append("#").append(uniq).toString();
@@ -330,15 +329,11 @@ public class CodecBinMsg extends AbsV0MsgCodec {
// get and check topic configure
this.topicName = configManager.getTopicName(this.groupId,
this.streamId);
if (StringUtils.isBlank(this.topicName)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
- this.topicName = source.getDefTopic();
- } else {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
- this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
- this.errMsg = String.format("Topic not configured for
groupId=(%s), streamId=(%s)",
- this.groupId, this.streamId);
- return false;
- }
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
+ this.errMsg = String.format("Topic not configured for
groupId=(%s), streamId=(%s)",
+ this.groupId, this.streamId);
+ return false;
}
if (StringUtils.isBlank(this.streamId)) {
this.streamId = "";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index 50f7552a32..07a8f1aac5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
@@ -120,17 +119,19 @@ public class CodecTextMsg extends AbsV0MsgCodec {
int readPos = 0;
int singleMsgLen;
ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
- while (bodyBuffer.remaining() > 0) {
+ int remaining = bodyBuffer.remaining();
+ while (remaining > 0) {
singleMsgLen = bodyBuffer.getInt(readPos);
- if (singleMsgLen <= 0 || singleMsgLen >
bodyBuffer.remaining()) {
+ if (singleMsgLen <= 0 || singleMsgLen > remaining) {
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
this.errMsg = String.format(
"Malformed data len, singleMsgLen(%d), buffer
remaining(%d), attr: (%s)",
- singleMsgLen, bodyBuffer.remaining(), origAttr);
+ singleMsgLen, remaining, origAttr);
return false;
}
readPos += 4 + singleMsgLen;
+ remaining -= 4 + singleMsgLen;
}
}
return true;
@@ -148,15 +149,11 @@ public class CodecTextMsg extends AbsV0MsgCodec {
// get and check topic configure
String tmpTopicName =
ConfigManager.getInstance().getTopicName(tmpGroupId, tmpStreamId);
if (StringUtils.isBlank(tmpTopicName)) {
- if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
- tmpTopicName = source.getDefTopic();
- } else {
-
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
- this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
- this.errMsg = String.format(
- "Topic not configured for groupId=(%s),
streamId=(%s)", tmpGroupId, tmpStreamId);
- return false;
- }
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+ this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
+ this.errMsg = String.format(
+ "Topic not configured for groupId=(%s), streamId=(%s)",
tmpGroupId, tmpStreamId);
+ return false;
}
this.groupId = tmpGroupId;
this.topicName = tmpTopicName;
@@ -190,7 +187,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
// process sequence id
String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID);
if (StringUtils.isNotBlank(sequenceId)) {
-
strBuff.append(topicName).append(AttributeConstants.SEPARATOR).append(streamId)
+
strBuff.append(groupId).append(AttributeConstants.SEPARATOR).append(streamId)
.append(AttributeConstants.SEPARATOR).append(sequenceId)
.append("#").append(strRemoteIP);
msgSeqId = strBuff.toString();
@@ -251,6 +248,18 @@ public class CodecTextMsg extends AbsV0MsgCodec {
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);
attrMap.put(AttributeConstants.MESSAGE_COUNT,
String.valueOf(this.msgCount));
} else {
+ if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))) {
+ if (bodyData[bodyData.length - 1] == '\n') {
+ int tripDataLen = bodyData.length - 1;
+ if (bodyData[bodyData.length - 2] == '\r') {
+ tripDataLen = bodyData.length - 2;
+ }
+ byte[] tripData = new byte[tripDataLen];
+ System.arraycopy(bodyData, 0, tripData, 0, tripDataLen);
+ bodyData = tripData;
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_TRIP);
+ }
+ }
inLongMsg.addMsg(mapJoiner.join(attrMap), bodyData);
}
long pkgTime = inLongMsg.getCreatetime();
@@ -258,5 +267,4 @@ public class CodecTextMsg extends AbsV0MsgCodec {
inLongMsg.reset();
return event;
}
-
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 930eccc700..9f0111e21e 100644
---
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -23,8 +23,6 @@ import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
/**
* Test for {@link CommonConfigHolder}
*/
@@ -35,8 +33,10 @@ public class TestCommonConfigHolder {
Assert.assertEquals("proxy_inlong5th_sz",
CommonConfigHolder.getInstance().getClusterName());
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableWhiteList());
- assertEquals("DataProxy",
+ Assert.assertEquals("DataProxy",
CommonConfigHolder.getInstance().getProperties().get(MetricListener.KEY_METRIC_DOMAINS));
- assertEquals(10000,
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
+ Assert.assertEquals(10000,
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
+
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
+
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 49d0e277c0..4ee6350595 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -25,3 +25,8 @@ startup.using.local.meta.file.enable=true
proxy.enable.whitelist=true
meta.config.sync.interval.ms=10000
+
+# whether to accept messages of unconfigured topics
+id2topic.unconfigured.accept.enable=true
+# the default topic if accept unconfigured topic's message
+id2topic.unconfigured.default.topics= test1 test2 test3