This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 62d832e83e [INLONG-8228][DataProxy] Optimize the implementation of the
index output to files (#8229)
62d832e83e is described below
commit 62d832e83ee9c8e4cf6000b97b14164f1f36cd46
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jun 14 11:00:38 2023 +0800
[INLONG-8228][DataProxy] Optimize the implementation of the index output to
files (#8229)
---
.../inlong/common/enums/DataProxyErrCode.java | 6 +-
inlong-dataproxy/conf/log4j2.xml | 4 +-
.../channel/FailoverChannelProcessor.java | 9 +-
.../dataproxy/config/CommonConfigHolder.java | 2 +-
.../inlong/dataproxy/consts/StatConstants.java | 57 ++++---
.../exception/MainChannelFullException.java | 33 ++++
.../dataproxy/metrics/stats/AbsStatsDaemon.java | 141 +++++++++++++++++
.../dataproxy/metrics/stats/MonitorIndex.java | 174 +++++++++++++++++++++
.../dataproxy/metrics/stats/MonitorStats.java | 148 ++++++++++++++++++
.../inlong/dataproxy/source2/BaseSource.java | 75 ++++++---
.../dataproxy/source2/InLongMessageHandler.java | 126 ++++++++-------
.../inlong/dataproxy/source2/SimpleHttpSource.java | 5 +
.../inlong/dataproxy/source2/SimpleTcpSource.java | 5 +
.../inlong/dataproxy/source2/SimpleUdpSource.java | 5 +
.../inlong/dataproxy/source2/SourceConstants.java | 10 ++
.../source2/httpMsg/InLongHttpMsgHandler.java | 77 +++++----
.../dataproxy/source2/v0msg/AbsV0MsgCodec.java | 34 +++-
.../dataproxy/source2/v0msg/CodecBinMsg.java | 57 +++----
.../dataproxy/source2/v0msg/CodecTextMsg.java | 19 ++-
19 files changed, 802 insertions(+), 185 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 5caeb2d875..c4ef15ce86 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -38,7 +38,8 @@ public enum DataProxyErrCode {
HTTP_UNSUPPORTED_SERVICE_URI(35, "Un-supported service uri"),
HTTP_UNSUPPORTED_CONTENT_TYPE(36, "Un-supported content type"),
- FIELD_VALUE_NOT_EQUAL(95, "Field value not equal"),
+ FIELD_MAGIC_NOT_EQUAL(94, "Magic value not equal"),
+ FIELD_LENGTH_VALUE_NOT_EQUAL(95, "Field length value not equal"),
UNCOMPRESS_DATA_ERROR(96, "Uncompress data error"),
MISS_REQUIRED_GROUPID_ARGUMENT(100, "Parameter groupId is required"),
@@ -68,6 +69,9 @@ public enum DataProxyErrCode {
GROUPID_OR_STREAMID_NOT_CONFIGURE(121, "GroupId or StreamId not found in
configure"),
GROUPID_OR_STREAMID_INCONSTANT(122, "GroupId or StreamId inconstant"),
+ ATTR_ORDER_CONTROL_CONFLICT_ERROR(150, "Require order send but isAck is
false"),
+ ATTR_PROXY_CONTROL_CONFLICT_ERROR(151, "Require proxy send but isAck is
false"),
+
UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
private final int errCode;
diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
index f05e67a039..6a48d1b2f3 100644
--- a/inlong-dataproxy/conf/log4j2.xml
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -159,10 +159,10 @@
</appenders>
<loggers>
- <logger name="org.apache.inlong.common.monitor.MonitorIndexExt"
level="info" additivity="false">
+ <logger name="org.apache.inlong.dataproxy.metrics.stats.MonitorStats"
level="info" additivity="false">
<appender-ref ref="MonitorFile"/>
</logger>
- <logger name="org.apache.inlong.common.monitor.MonitorIndex"
level="info" additivity="false">
+ <logger name="org.apache.inlong.dataproxy.metrics.stats.MonitorIndex"
level="info" additivity="false">
<appender-ref ref="IndexFile"/>
</logger>
<logger name="org.apache.pulsar" level="info" additivity="false">
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 5940da14ce..2162019317 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -17,10 +17,9 @@
package org.apache.inlong.dataproxy.channel;
-import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
-import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.exception.MainChannelFullException;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import com.google.common.base.Preconditions;
@@ -258,7 +257,7 @@ public class FailoverChannelProcessor
tx.commit();
} catch (Throwable t) {
- errMsg = "Unable to put event on channel" +
reqChannel.getName()
+ errMsg = "Unable to put event on channel " +
reqChannel.getName()
+ ", error message is " + t.getMessage();
if (logPrinter.shouldPrint()) {
LOG.error("FailoverChannelProcessor Unable to put event on
required channel: "
@@ -279,9 +278,7 @@ public class FailoverChannelProcessor
}
if (!success) {
if (MessageUtils.isSyncSendForOrder(event)) {
- MessageUtils.sinkReturnRspPackage((SinkRspEvent) event,
- DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE, errMsg);
- return;
+ throw new MainChannelFullException(errMsg);
}
List<Channel> optionalChannels =
selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index 02afd1a80e..81ba91e4df 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -102,7 +102,7 @@ public class CommonConfigHolder {
public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
// event metric statistic name
public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME =
"file.metric.event.output.name";
- public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME =
"DataProxy_monitors";
+ public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME = "Stats";
// Audit fields
public static final String KEY_ENABLE_AUDIT = "audit.enable";
public static final boolean VAL_DEF_ENABLE_AUDIT = true;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 1d262a8112..48e087c10c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -27,9 +27,24 @@ public class StatConstants {
public static final java.lang.String EVENT_VISIT_LINKIN = "visit.linkin";
public static final java.lang.String EVENT_VISIT_LINKOUT = "visit.linkout";
public static final java.lang.String EVENT_VISIT_EXCEPTION =
"visit.exception";
+ // channel
+ public static final java.lang.String EVENT_REMOTE_UNWRITABLE =
"socket.unwritable";
// configure
public static final java.lang.String EVENT_CONFIG_TOPIC_MISSING =
"config.topic.missing";
+ public static final java.lang.String EVENT_CONFIG_IDNUM_EMPTY =
"config.idnum.empty";
+ public static final java.lang.String EVENT_CONFIG_GROUPIDNUM_MISSING =
"config.groupidnum.missing";
+ public static final java.lang.String EVENT_CONFIG_GROUP_IDNUM_INCONSTANT =
"config.group.idnum.incons";
+ public static final java.lang.String EVENT_CONFIG_STREAMIDNUM_MISSING =
"config.streamidnum.missing";
+ public static final java.lang.String EVENT_CONFIG_STREAM_IDNUM_INCONSTANT
= "config.stream.idnum.incons";
// source
+ public static final java.lang.String EVENT_PKG_READABLE_EMPTY =
"pkg.readable.empty";
+ public static final java.lang.String EVENT_PKG_READABLE_OVERMAX =
"pkg.readable.overmax";
+ public static final java.lang.String EVENT_PKG_READABLE_UNFILLED =
"pkg.readable.unfilled";
+ public static final java.lang.String EVENT_PKG_MSGTYPE_V0_INVALID =
"pkg.msgtype.v0.invalid";
+ public static final java.lang.String EVENT_PKG_MSGTYPE_V1_INVALID =
"pkg.msgtype.v1.invalid";
+ // message
+ public static final java.lang.String EVENT_MSG_BIN_TOTALLEN_BELOWMIN =
"msg.bin.totallen.belowmin";
+ public static final java.lang.String EVENT_MSG_TXT_TOTALLEN_BELOWMIN =
"msg.txt.totallen.belowmin";
public static final java.lang.String EVENT_MSG_DECODE_FAIL =
"msg.decode.failure";
public static final java.lang.String EVENT_MSG_METHOD_INVALID =
"msg.method.invalid";
public static final java.lang.String EVENT_MSG_PATH_INVALID =
"msg.path.invalid";
@@ -38,29 +53,29 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_STREAMID_MISSING =
"msg.streamid.missing";
public static final java.lang.String EVENT_MSG_BODY_MISSING =
"msg.body.missing";
public static final java.lang.String EVENT_MSG_BODY_BLANK =
"msg.body.blank";
+ public static final java.lang.String EVENT_MSG_BODY_ZERO = "msg.body.zero";
+ public static final java.lang.String EVENT_MSG_BODY_NEGATIVE =
"msg.body.negative";
+ public static final java.lang.String EVENT_MSG_BODY_UNPRESS_EXP =
"msg.body.unpress.exp";
public static final java.lang.String EVENT_MSG_BODY_OVERMAX =
"msg.body.overmax";
+ public static final java.lang.String EVENT_MSG_ATTR_NEGATIVE =
"msg.attr.negative";
+ public static final java.lang.String EVENT_MSG_MAGIC_UNEQUAL =
"msg.magic.unequal";
+ public static final java.lang.String EVENT_MSG_HB_TOTALLEN_BELOWMIN =
"msg.hb.totallen.belowmin";
+ public static final java.lang.String EVENT_MSG_HB_MAGIC_UNEQUAL =
"msg.hb.magic.unequal";
+ public static final java.lang.String EVENT_MSG_HB_LEN_MALFORMED =
"msg.hb.len.malformed";
+ public static final java.lang.String EVENT_MSG_BIN_LEN_MALFORMED =
"msg.bin.len.malformed";
+ public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED =
"msg.txt.len.malformed";
+ public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED =
"msg.item.len.malformed";
+ public static final java.lang.String EVENT_MSG_ATTR_INVALID =
"msg.attr.invalid";
+ public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID =
"msg.attr.order.noack";
+ public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID =
"msg.attr.proxy.noack";
+ public static final java.lang.String EVENT_MSG_INDEXMSG_ILLEGAL =
"msg.index.illegal";
+ public static final java.lang.String EVENT_MSG_GROUPIDNUM_ZERO =
"msg.groupidnum.zero";
+ public static final java.lang.String EVENT_MSG_STREAMIDNUM_ZERO =
"msg.streamidnum.zero";
public static final java.lang.String EVENT_MSG_HB_SUCCESS =
"msg.hb.success";
- public static final java.lang.String EVENT_MSG_POST_SUCCESS =
"msg.post.success";
- public static final java.lang.String EVENT_MSG_POST_FAILURE =
"msg.post.failure";
-
- public static final java.lang.String EVENT_EMPTY = "socketmsg.empty";
- public static final java.lang.String EVENT_OVERMAXLEN =
"socketmsg.overmaxlen";
- public static final java.lang.String EVENT_NOTEQUALLEN =
"socketmsg.notequallen";
- public static final java.lang.String EVENT_MSGUNKNOWN_V0 =
"socketmsg.unknownV0";
- public static final java.lang.String EVENT_MSGUNKNOWN_V1 =
"socketmsg.unknownV1";
- public static final java.lang.String EVENT_MALFORMED =
"socketmsg.malformed";
- public static final java.lang.String EVENT_NOBODY = "socketmsg.nobody";
- public static final java.lang.String EVENT_NEGBODY = "socketmsg.negbody";
- public static final java.lang.String EVENT_NEGATTR = "socketmsg.negattr";
- public static final java.lang.String EVENT_INVALIDATTR =
"socketmsg.invattr";
- public static final java.lang.String EVENT_UNSUPMSG = "socketmsg.unsupmsg";
- public static final java.lang.String EVENT_UNPRESSEXP =
"socketmsg.upressexp";
- public static final java.lang.String EVENT_WITHOUTGROUPID =
"socketmsg.wogroupid";
- public static final java.lang.String EVENT_INCONSGROUPORSTREAMID =
"socketmsg.inconsids";
- public static final java.lang.String EVENT_CHANNEL_NOT_WRITABLE =
"socketch.notwritable";
- public static final java.lang.String EVENT_POST_SUCCESS =
"socketmsg.success";
- public static final java.lang.String EVENT_POST_DROPPED =
"socketmsg.dropped";
- // http
+ public static final java.lang.String EVENT_MSG_V0_POST_SUCCESS =
"msg.post.v0.success";
+ public static final java.lang.String EVENT_MSG_V0_POST_FAILURE =
"msg.post.v0.failure";
+ public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS =
"msg.post.v1.success";
+ public static final java.lang.String EVENT_MSG_V1_POST_DROPPED =
"msg.post.v1.dropped";
public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
new file mode 100644
index 0000000000..6b5379ac72
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/exception/MainChannelFullException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.exception;
+
+import org.apache.flume.ChannelException;
+
+/**
+ * MainChannelFullException
+ *
+ * When orderly sending and proxy sending messages, if the main channel is
full,
+ * it will no longer cache and wait, and directly throw this exception.
+ */
+public class MainChannelFullException extends ChannelException {
+
+ public MainChannelFullException(String message) {
+ super(message);
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
new file mode 100644
index 0000000000..93bb0c73e2
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/AbsStatsDaemon.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * AbsStatsDaemon
+ *
+ *
+ * Statistics daemon that periodically outputs metrics data to a file.
+ */
+
+public abstract class AbsStatsDaemon implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbsStatsDaemon.class);
+ private static final long MAX_PRINT_TIME_MS = 10000L;
+ private final String name;
+ private final String threadName;
+ private final long intervalMs;
+ private final int maxStatsCnt;
+ private final Thread daemon;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final AtomicInteger writeIndex = new AtomicInteger(0);
+
+ public AbsStatsDaemon(String name, long intervalMs, int maxCnt) {
+ this.name = name;
+ this.maxStatsCnt = maxCnt;
+ this.intervalMs = intervalMs;
+ this.threadName = "Daemon_Thread_" + name;
+ this.daemon = new Thread(this, this.threadName);
+ this.daemon.setDaemon(true);
+ }
+
+ public void start() {
+ this.daemon.start();
+ LOGGER.info("{} is started!", this.name);
+ }
+
+ public boolean isStopped() {
+ return this.shutdown.get();
+ }
+
+ public boolean stop() {
+ if (this.shutdown.get()) {
+ return true;
+ }
+ if (this.shutdown.compareAndSet(false, true)) {
+ LOGGER.info("{} is closing ......", this.name);
+ try {
+ if (this.daemon != null) {
+ this.daemon.interrupt();
+ this.daemon.join();
+ }
+ } catch (Throwable e) {
+ //
+ }
+ LOGGER.info("{} is stopped", this.name);
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract int loopProcess(long startTime);
+
+ protected abstract int exitProcess(long startTime);
+
+ /**
+ * Get the writable index
+ *
+ * @return the writable block index
+ */
+ protected int getWriteIndex() {
+ return Math.abs(writeIndex.get() % 2);
+ }
+
+ /**
+ * Gets the read index
+ *
+ * @return the read block index
+ */
+ protected int getReadIndex() {
+ return Math.abs((writeIndex.get() - 1) % 2);
+ }
+
+ @Override
+ public void run() {
+ int printCnt;
+ long startTime;
+ LOGGER.info("{} is started", this.threadName);
+ // process daemon task
+ while (!shutdown.get()) {
+ try {
+ Thread.sleep(intervalMs);
+ writeIndex.incrementAndGet();
+ startTime = System.currentTimeMillis();
+ printCnt = loopProcess(startTime);
+ checkAndPrintStatus(printCnt, System.currentTimeMillis() -
startTime);
+ } catch (InterruptedException e) {
+ LOGGER.info("{} has been interrupted", this.threadName);
+ break;
+ } catch (Throwable t) {
+ LOGGER.info("{} throw a exception", this.threadName);
+ }
+ }
+ // process exit output
+ startTime = System.currentTimeMillis();
+ printCnt = exitProcess(startTime);
+ checkAndPrintStatus(printCnt, System.currentTimeMillis() - startTime);
+ LOGGER.info("{} is stopped", this.threadName);
+ }
+
+ private void checkAndPrintStatus(int printCnt, long outputTime) {
+ if (printCnt > maxStatsCnt) {
+ LOGGER.warn("{} print {} items, over max allowed count {}",
+ this.threadName, printCnt, this.maxStatsCnt);
+ }
+ if (outputTime > MAX_PRINT_TIME_MS) {
+ LOGGER.warn("{} print items wasts {} ms", this.threadName,
outputTime);
+ }
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
new file mode 100644
index 0000000000..1aaa7c0ddb
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorIndex.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * MonitorIndex
+ *
+ *
+ * The index statistics received or sent by DataProxy nodes, and output to
file.
+ */
+public class MonitorIndex extends AbsStatsDaemon {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorIndex.class);
+
+ private static final AtomicLong RECODE_ID = new AtomicLong(0);
+ private final StatsUnit[] statsUnits = new StatsUnit[2];
+
+ public MonitorIndex(String name, long intervalMill, int maxCnt) {
+ super(name, intervalMill, maxCnt);
+ this.statsUnits[0] = new StatsUnit(name);
+ this.statsUnits[1] = new StatsUnit(name);
+ }
+
+ /**
+ * Add success statistic
+ *
+ * @param key the statistic key
+ * @param msgCnt the message count
+ * @param packCnt the package count
+ * @param packSize the package size
+ */
+ public void addSuccStats(String key, int msgCnt, int packCnt, long
packSize) {
+ if (isStopped()) {
+ return;
+ }
+ statsUnits[getWriteIndex()].addSuccCnt(key, msgCnt, packCnt, packSize);
+ }
+
+ /**
+ * Add failure statistic
+ *
+ * @param key the statistic key
+ * @param failCnt the failure count
+ */
+ public void addFailStats(String key, int failCnt) {
+ if (isStopped()) {
+ return;
+ }
+ statsUnits[getWriteIndex()].addFailCnt(key, failCnt);
+ }
+
+ @Override
+ protected int loopProcess(long startTime) {
+ return statsUnits[getReadIndex()].printAndResetStatsInfo(startTime);
+ }
+
+ @Override
+ protected int exitProcess(long startTime) {
+ int totalCnt = 0;
+ if (!statsUnits[getReadIndex()].isEmpty()) {
+ totalCnt +=
statsUnits[getReadIndex()].printAndResetStatsInfo(startTime);
+ }
+ if (!statsUnits[getWriteIndex()].isEmpty()) {
+ totalCnt +=
statsUnits[getWriteIndex()].printAndResetStatsInfo(startTime);
+ }
+ return totalCnt;
+ }
+
+ private static class StatsUnit {
+
+ private final String statsName;
+ private final ConcurrentHashMap<String, StatsItem> counterMap = new
ConcurrentHashMap<>();
+
+ public StatsUnit(String statsName) {
+ this.statsName = statsName;
+ }
+
+ public boolean isEmpty() {
+ return counterMap.isEmpty();
+ }
+
+ public void addSuccCnt(String key, int cnt, int packcnt, long
packsize) {
+ StatsItem statsItem = counterMap.get(key);
+ if (statsItem == null) {
+ StatsItem tmpItem = new StatsItem();
+ statsItem = counterMap.putIfAbsent(key, tmpItem);
+ if (statsItem == null) {
+ statsItem = tmpItem;
+ }
+ }
+ statsItem.addSuccessCnt(cnt, packcnt, packsize);
+ }
+
+ public void addFailCnt(String key, int failCnt) {
+ StatsItem statsItem = counterMap.get(key);
+ if (statsItem == null) {
+ StatsItem tmpItem = new StatsItem();
+ statsItem = counterMap.putIfAbsent(key, tmpItem);
+ if (statsItem == null) {
+ statsItem = tmpItem;
+ }
+ }
+ statsItem.addFailCnt(failCnt);
+ }
+
+ public int printAndResetStatsInfo(long startTime) {
+ int printCnt = 0;
+ // get print time (second)
+ long printTime = startTime / 1000;
+ for (Map.Entry<String, StatsItem> entry : counterMap.entrySet()) {
+ if (entry == null || entry.getKey() == null ||
entry.getValue() == null) {
+ continue;
+ }
+ LOGGER.info("{}#{}_{}#{}#{}", this.statsName, printTime,
+ RECODE_ID.incrementAndGet(), entry.getKey(),
entry.getValue().toString());
+ printCnt++;
+ }
+ counterMap.clear();
+ return printCnt;
+ }
+ }
+
+ private static class StatsItem {
+
+ private final LongAdder msgCnt = new LongAdder();
+ private final LongAdder packCnt = new LongAdder();
+ private final LongAdder packSize = new LongAdder();
+ private final LongAdder failCnt = new LongAdder();
+
+ public StatsItem() {
+
+ }
+
+ public void addSuccessCnt(int msgCnt, int packCnt, long packSize) {
+ this.msgCnt.add(msgCnt);
+ this.packCnt.add(packCnt);
+ this.packSize.add(packSize);
+ }
+
+ public void addFailCnt(int failCnt) {
+ this.failCnt.add(failCnt);
+ }
+
+ @Override
+ public String toString() {
+ return msgCnt.longValue() + "#" + packCnt.longValue() + "#"
+ + packSize.longValue() + "#" + failCnt.longValue();
+ }
+ }
+
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
new file mode 100644
index 0000000000..4fd25337bc
--- /dev/null
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/stats/MonitorStats.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.stats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * MonitorStats
+ *
+ *
+ * Summary statistics and detailed statistics on failure, output to file.
+ */
+public class MonitorStats extends AbsStatsDaemon {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorStats.class);
+
+ private final StatsUnit[] statsUnits = new StatsUnit[2];
+
+ public MonitorStats(String name, long intervalMill, int maxCnt) {
+ super(name, intervalMill, maxCnt);
+ this.statsUnits[0] = new StatsUnit(name);
+ this.statsUnits[1] = new StatsUnit(name);
+ }
+
+ /**
+ * add summary statistic items
+ *
+ * @param sumKey the summary key
+ */
+ public void incSumStats(String sumKey) {
+ if (isStopped()) {
+ return;
+ }
+ statsUnits[getWriteIndex()].incSumStats(sumKey);
+ }
+
+ /**
+ * add detail statistic items
+ *
+ * @param detailKey the detail key
+ */
+ public void incDetailStats(String detailKey) {
+ if (isStopped()) {
+ return;
+ }
+ statsUnits[getWriteIndex()].incDetailStats(detailKey);
+ }
+
+ @Override
+ protected int loopProcess(long startTime) {
+ return statsUnits[getReadIndex()].printAndResetStatsInfo();
+ }
+
+ @Override
+ protected int exitProcess(long startTime) {
+ int totalCnt = 0;
+ if (!statsUnits[getReadIndex()].isEmpty()) {
+ totalCnt += statsUnits[getReadIndex()].printAndResetStatsInfo();
+ }
+ if (!statsUnits[getWriteIndex()].isEmpty()) {
+ totalCnt += statsUnits[getWriteIndex()].printAndResetStatsInfo();
+ }
+ return totalCnt;
+ }
+
+ private static class StatsUnit {
+
+ private final String statsName;
+ private final ConcurrentHashMap<String, LongAdder> sumMap = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, LongAdder> detailsMap = new
ConcurrentHashMap<>();
+
+ public StatsUnit(String statsName) {
+ this.statsName = statsName;
+ }
+
+ public boolean isEmpty() {
+ return sumMap.isEmpty() && detailsMap.isEmpty();
+ }
+
+ public void incSumStats(String key) {
+ LongAdder statsItem = sumMap.get(key);
+ if (statsItem == null) {
+ LongAdder tmpItem = new LongAdder();
+ statsItem = sumMap.putIfAbsent(key, tmpItem);
+ if (statsItem == null) {
+ statsItem = tmpItem;
+ }
+ }
+ statsItem.increment();
+ }
+
+ public void incDetailStats(String key) {
+ LongAdder statsItem = detailsMap.get(key);
+ if (statsItem == null) {
+ LongAdder tmpItem = new LongAdder();
+ statsItem = detailsMap.putIfAbsent(key, tmpItem);
+ if (statsItem == null) {
+ statsItem = tmpItem;
+ }
+ }
+ statsItem.increment();
+ }
+
+ public int printAndResetStatsInfo() {
+ int sumCnt = 0;
+ // print summary info
+ for (Map.Entry<String, LongAdder> entry : sumMap.entrySet()) {
+ if (entry == null || entry.getKey() == null ||
entry.getValue() == null) {
+ continue;
+ }
+ LOGGER.info("{}.summary.{}={}", this.statsName,
entry.getKey(), entry.getValue());
+ sumCnt++;
+ }
+ // print detail info
+ for (Map.Entry<String, LongAdder> entry : detailsMap.entrySet()) {
+ if (entry == null || entry.getKey() == null ||
entry.getValue() == null) {
+ continue;
+ }
+ LOGGER.info("{}.detail.{}={}", this.statsName, entry.getKey(),
entry.getValue());
+ sumCnt++;
+ }
+ sumMap.clear();
+ detailsMap.clear();
+ return sumCnt;
+ }
+ }
+
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index fd9ae39caf..0cf601495f 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -18,8 +18,6 @@
package org.apache.inlong.dataproxy.source2;
import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.monitor.MonitorIndex;
-import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
@@ -29,6 +27,8 @@ import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorIndex;
+import org.apache.inlong.dataproxy.metrics.stats.MonitorStats;
import org.apache.inlong.dataproxy.source2.httpMsg.InLongHttpMsgHandler;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -105,6 +105,12 @@ public abstract class BaseSource
protected long maxReadIdleTimeMs;
// max connection count
protected int maxConnections;
+ // reuse address
+ protected boolean reuseAddress;
+ // connect backlog
+ protected int conBacklog;
+ // connect linger
+ protected int conLinger = -1;
// netty parameters
protected EventLoopGroup acceptorGroup;
protected EventLoopGroup workerGroup;
@@ -116,7 +122,7 @@ public abstract class BaseSource
protected int maxSendBufferSize;
// file metric statistic
protected MonitorIndex monitorIndex = null;
- private MonitorIndexExt monitorIndexExt = null;
+ private MonitorStats monitorStats = null;
// metric set
protected DataProxyMetricItemSet metricItemSet;
@@ -202,6 +208,24 @@ public abstract class BaseSource
Preconditions.checkArgument(this.maxConnections >=
SourceConstants.VAL_MIN_CONNECTION_CNT,
SourceConstants.SRCCXT_MAX_CONNECTION_CNT + " must be >= "
+ SourceConstants.VAL_MIN_CONNECTION_CNT);
+ // get connect backlog
+ this.conBacklog = ConfStringUtils.getIntValue(context,
+ SourceConstants.SRCCXT_CONN_BACKLOG,
SourceConstants.VAL_DEF_CONN_BACKLOG);
+ Preconditions.checkArgument(this.conBacklog >=
SourceConstants.VAL_MIN_CONN_BACKLOG,
+ SourceConstants.SRCCXT_CONN_BACKLOG + " must be >= "
+ + SourceConstants.VAL_MIN_CONN_BACKLOG);
+ // get connect linger
+ Integer tmpValue =
context.getInteger(SourceConstants.SRCCXT_CONN_LINGER);
+ if (tmpValue != null && tmpValue >= 0) {
+ this.conLinger = tmpValue;
+ }
+ // get whether reuse address
+ this.reuseAddress =
context.getBoolean(SourceConstants.SRCCXT_REUSE_ADDRESS,
+ SourceConstants.VAL_DEF_REUSE_ADDRESS);
+
+ // get whether custom channel processor
+ this.customProcessor =
context.getBoolean(SourceConstants.SRCCXT_CUSTOM_CHANNEL_PROCESSOR,
+ SourceConstants.VAL_DEF_CUSTOM_CH_PROCESSOR);
// get max receive buffer size
this.maxRcvBufferSize = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE,
SourceConstants.VAL_DEF_RECEIVE_BUFFER_SIZE);
@@ -233,13 +257,15 @@ public abstract class BaseSource
// init monitor logic
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
-
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
- this.monitorIndexExt = new MonitorIndexExt(
+ this.monitorIndex.start();
+ this.monitorStats = new MonitorStats(
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+ AttrConstants.SEP_HASHTAG +
this.getProtocolName(),
-
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+ this.monitorStats.start();
}
startSource();
// register
@@ -269,15 +295,6 @@ public abstract class BaseSource
}
// stop super class
super.stop();
- // stop file statistic index
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
- if (monitorIndex != null) {
- monitorIndex.shutDown();
- }
- if (monitorIndexExt != null) {
- monitorIndexExt.shutDown();
- }
- }
// stop workers
if (this.acceptorGroup != null) {
this.acceptorGroup.shutdownGracefully();
@@ -285,6 +302,15 @@ public abstract class BaseSource
if (this.workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
+ // stop file statistic index
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (monitorIndex != null) {
+ monitorIndex.stop();
+ }
+ if (monitorStats != null) {
+ monitorStats.stop();
+ }
+ }
logger.info("[STOP {} SOURCE]{} stopped", this.getProtocolName(),
this.getName());
}
@@ -385,18 +411,29 @@ public abstract class BaseSource
return maxWorkerThreads;
}
- public void fileMetricEventInc(String eventKey) {
+ public void fileMetricIncSumStats(String eventKey) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorStats.incSumStats(eventKey);
+ }
+ }
+
+ public void fileMetricIncDetailStats(String eventKey) {
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
- monitorIndexExt.incrementAndGet(eventKey);
+ monitorStats.incDetailStats(eventKey);
}
}
- public void fileMetricRecordAdd(String key, int cnt, int packCnt, long
packSize, int failCnt) {
+ public void fileMetricAddSuccCnt(String key, int cnt, int packCnt, long
packSize) {
if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
- monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+ monitorIndex.addSuccStats(key, cnt, packCnt, packSize);
}
}
+ public void fileMetricAddFailCnt(String key, int failCnt) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorIndex.addFailStats(key, failCnt);
+ }
+ }
/**
* addMetric
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index 0594a8682b..9fb5144aa2 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -44,7 +44,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,8 +92,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
if (msg == null) {
- source.fileMetricEventInc(StatConstants.EVENT_EMPTY);
- logger.debug("Get null msg, just skip!");
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
return;
}
ByteBuf cb = (ByteBuf) msg;
@@ -102,12 +100,11 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
int readableLength = cb.readableBytes();
if (readableLength == 0 && source.isFilterEmptyMsg()) {
cb.clear();
- source.fileMetricEventInc(StatConstants.EVENT_EMPTY);
- logger.debug("skip empty msg.");
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_EMPTY);
return;
}
if (readableLength > source.getMaxMsgLength()) {
- source.fileMetricEventInc(StatConstants.EVENT_OVERMAXLEN);
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_OVERMAX);
throw new Exception("Error msg, readableLength(" +
readableLength +
") > max allowed message length (" +
source.getMaxMsgLength() + ")");
}
@@ -118,9 +115,9 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
if (readableLength < totalDataLen + INLONG_LENGTH_FIELD_LENGTH) {
// reset index when buffer is not satisfied.
cb.resetReaderIndex();
- source.fileMetricEventInc(StatConstants.EVENT_NOTEQUALLEN);
- throw new Exception("Error msg, channel buffer is not
satisfied, and readableLength="
- + readableLength + ", and totalPackLength=" +
totalDataLen + " + 4");
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED);
+ throw new Exception("Error msg, buffer is unfilled,
readableLength="
+ + readableLength + ", totalPackLength=" + totalDataLen
+ " + 4");
}
// read type
int msgTypeValue = cb.readByte();
@@ -133,7 +130,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
processV1Msg(ctx, cb, bodyLength);
} else {
// unknown message type
-
source.fileMetricEventInc(StatConstants.EVENT_MSGUNKNOWN_V1);
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V1_INVALID);
throw new Exception("Unknown V1 message version, version =
" + msgTypeValue);
}
} else {
@@ -142,7 +139,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
MsgType msgType = MsgType.valueOf(msgTypeValue);
final long msgRcvTime = System.currentTimeMillis();
if (MsgType.MSG_UNKNOWN == msgType) {
-
source.fileMetricEventInc(StatConstants.EVENT_MSGUNKNOWN_V0);
+
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_MSGTYPE_V0_INVALID);
if (logger.isDebugEnabled()) {
logger.debug("Received unknown message, channel {}",
channel);
}
@@ -162,7 +159,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
// check whether totalDataLen is valid.
if (MsgType.MSG_BIN_MULTI_BODY == msgType) {
if (totalDataLen < BIN_MSG_FIXED_CONTENT_SIZE) {
-
source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BIN_TOTALLEN_BELOWMIN);
String errMsg = String.format("Malformed msg,
totalDataLen(%d) < min bin7-msg length(%d)",
totalDataLen, BIN_MSG_FIXED_CONTENT_SIZE);
if (logger.isDebugEnabled()) {
@@ -173,7 +170,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
msgCodec = new CodecBinMsg(totalDataLen, msgTypeValue,
msgRcvTime, strRemoteIP);
} else {
if (totalDataLen < TXT_MSG_FIXED_CONTENT_SIZE) {
-
source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_TOTALLEN_BELOWMIN);
String errMsg = String.format("Malformed msg,
totalDataLen(%d) < min txt-msg length(%d)",
totalDataLen, TXT_MSG_FIXED_CONTENT_SIZE);
if (logger.isDebugEnabled()) {
@@ -198,7 +195,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
String strRemoteIp =
AddressUtils.getChannelRemoteIP(ctx.channel());
if (strRemoteIp != null
&& ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
ctx.channel().disconnect();
ctx.channel().close();
logger.error(strRemoteIp + " is Illegal IP, so refuse it !");
@@ -207,7 +204,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
}
// check max allowed connection count
if (source.getAllChannels().size() >= source.getMaxConnections()) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
ctx.channel().disconnect();
ctx.channel().close();
logger.warn("{} refuse to connect = {} , connections = {},
maxConnections = {}",
@@ -217,32 +214,33 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
// add legal channel
source.getAllChannels().add(ctx.channel());
ctx.fireChannelActive();
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
logger.info("{} added new channel {}, current connections = {},
maxConnections = {}",
source.getName(), ctx.channel(),
source.getAllChannels().size(), source.getMaxConnections());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
- logger.error("{} channel {} inactive", source.getName(),
ctx.channel());
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
ctx.fireChannelInactive();
source.getAllChannels().remove(ctx.channel());
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
- logger.error("{} channel {} throw exception", source.getName(),
ctx.channel(), cause);
- ctx.fireExceptionCaught(cause);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
+ if (logCounter.shouldPrint()) {
+ logger.warn("{} received an exception from channel {}",
+ source.getName(), ctx.channel(), cause);
+ }
if (ctx.channel() != null) {
+ source.getAllChannels().remove(ctx.channel());
try {
ctx.channel().disconnect();
ctx.channel().close();
} catch (Exception ex) {
//
}
- source.getAllChannels().remove(ctx.channel());
}
ctx.close();
}
@@ -256,14 +254,14 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
}
// check service status.
if (source.isRejectService()) {
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+ source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
msgCodec.setFailureInfo(DataProxyErrCode.SERVICE_CLOSED);
responseV0Msg(channel, msgCodec, strBuff);
return;
}
// check if the node is linked to the Manager.
if (!ConfigManager.getInstance().isMqClusterReady()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
responseV0Msg(channel, msgCodec, strBuff);
return;
@@ -278,29 +276,37 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
// build metric data item
long longDataTime = msgCodec.getDataTimeMs() / 1000 / 60 / 10;
longDataTime = longDataTime * 1000 * 60 * 10;
-
strBuff.append(source.getProtocolName()).append(AttrConstants.SEPARATOR)
-
.append(msgCodec.getTopicName()).append(AttrConstants.SEPARATOR)
- .append(msgCodec.getStreamId()).append(AttrConstants.SEPARATOR)
-
.append(msgCodec.getStrRemoteIP()).append(AttrConstants.SEPARATOR)
- .append(source.getStrPort()).append(AttrConstants.SEPARATOR)
-
.append(msgCodec.getMsgProcType()).append(AttrConstants.SEPARATOR)
- .append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime))
- .append(AttrConstants.SEPARATOR).append(
-
DateTimeUtils.ms2yyyyMMddHHmm(msgCodec.getMsgRcvTime()));
+ String statsKey =
strBuff.append(source.getProtocolName()).append(AttrConstants.SEP_HASHTAG)
+
.append(msgCodec.getGroupId()).append(AttrConstants.SEP_HASHTAG)
+
.append(msgCodec.getStreamId()).append(AttrConstants.SEP_HASHTAG)
+
.append(msgCodec.getStrRemoteIP()).append(AttrConstants.SEP_HASHTAG)
+ .append(source.getSrcHost()).append(AttrConstants.SEP_HASHTAG)
+
.append(msgCodec.getMsgProcType()).append(AttrConstants.SEP_HASHTAG)
+
.append(DateTimeUtils.ms2yyyyMMddHHmm(longDataTime)).append(AttrConstants.SEP_HASHTAG)
+
.append(DateTimeUtils.ms2yyyyMMddHHmm(msgCodec.getMsgRcvTime())).toString();
+ strBuff.delete(0, strBuff.length());
try {
source.getChannelProcessor().processEvent(event);
- source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
- source.fileMetricRecordAdd(strBuff.toString(),
- msgCodec.getMsgCount(), 1, msgCodec.getBodyLength(), 0);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS);
+ source.fileMetricAddSuccCnt(statsKey, msgCodec.getMsgCount(), 1,
msgCodec.getBodyLength());
source.addMetric(true, event.getBody().length, event);
- strBuff.delete(0, strBuff.length());
+ if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
+ msgCodec.setSuccessInfo();
+ responseV0Msg(channel, msgCodec, strBuff);
+ }
} catch (Throwable ex) {
- logger.error("Error writting to channel, data will discard.", ex);
- source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
- source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0,
msgCodec.getMsgCount());
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
+ source.fileMetricAddFailCnt(statsKey, msgCodec.getMsgCount());
source.addMetric(false, event.getBody().length, event);
- strBuff.delete(0, strBuff.length());
- throw new ChannelException("ProcessEvent error can't write event
to channel.");
+ if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
+
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
+ strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString());
+ strBuff.delete(0, strBuff.length());
+ responseV0Msg(channel, msgCodec, strBuff);
+ }
+ if (logCounter.shouldPrint()) {
+ logger.error("Error writing event to channel failure.", ex);
+ }
}
}
@@ -313,7 +319,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
// reject service
if (source.isRejectService()) {
source.addMetric(false, 0, null);
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+ source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT,
packObject);
return;
}
@@ -380,7 +386,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
source.getChannelProcessor().processEvent(packEvent);
events.forEach(event -> {
source.addMetric(true, event.getBody().length, event);
- source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
});
boolean awaitResult = callback.getLatch().await(
CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(),
TimeUnit.MILLISECONDS);
@@ -393,7 +399,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
logger.error("Process Controller Event error can't write event to
channel.", ex);
events.forEach(event -> {
source.addMetric(false, event.getBody().length, event);
- source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
});
if (!callback.getHasResponsed().getAndSet(true)) {
this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT,
packObject);
@@ -418,7 +424,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
topic = source.getDefTopic();
} else {
-
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
source.addMetric(false, event.getBody().length, event);
this.responsePackage(ctx,
ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
return;
@@ -429,12 +435,12 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
try {
source.getChannelProcessor().processEvent(event);
source.addMetric(true, event.getBody().length, event);
- source.fileMetricEventInc(StatConstants.EVENT_POST_SUCCESS);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_SUCCESS);
} catch (Throwable ex) {
logger.error("Process Controller Event error can't write event
to channel.", ex);
source.addMetric(false, event.getBody().length, event);
this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT,
packObject);
- source.fileMetricEventInc(StatConstants.EVENT_POST_DROPPED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V1_POST_DROPPED);
return;
}
}
@@ -447,7 +453,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj,
StringBuilder strBuff) throws Exception {
// check channel status
if (channel == null || !channel.isWritable()) {
-
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Prepare send msg but channel full, msgType={},
attr={}, channel={}",
msgObj.getMsgType(), msgObj.getAttr(), channel);
@@ -491,7 +497,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
ByteBuf cb, int totalDataLen) throws Exception {
// Check if the message is complete and legal
if (totalDataLen < BIN_HB_FIXED_CONTENT_SIZE) {
- source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_TOTALLEN_BELOWMIN);
String errMsg = String.format("Malformed msg, totalDataLen(%d) <
min hb-msg length(%d)",
totalDataLen, BIN_HB_FIXED_CONTENT_SIZE);
if (logger.isDebugEnabled()) {
@@ -505,12 +511,20 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
int attrLen = cb.getShort(msgHeadPos + BIN_HB_BODY_OFFSET + bodyLen);
int msgMagic = cb.getUnsignedShort(msgHeadPos
+ BIN_HB_BODY_OFFSET + bodyLen + BIN_HB_ATTRLEN_SIZE +
attrLen);
- if ((totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen +
BIN_HB_FORMAT_SIZE))
- || (msgMagic != BIN_MSG_MAGIC)) {
- source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+ if (msgMagic != BIN_MSG_MAGIC) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_MAGIC_UNEQUAL);
+ String errMsg = String.format(
+ "Malformed msg, msgMagic(%d) != %d", msgMagic,
BIN_MSG_MAGIC);
+ if (logger.isDebugEnabled()) {
+ logger.debug(errMsg + ", channel {}", channel);
+ }
+ throw new Exception(errMsg);
+ }
+ if (totalDataLen + BIN_HB_TOTALLEN_SIZE < (bodyLen + attrLen +
BIN_HB_FORMAT_SIZE)) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_LEN_MALFORMED);
String errMsg = String.format(
- "Malformed msg, bodyLen(%d) + attrLen(%d) >
totalDataLen(%d) or msgMagic(%d) != %d",
- bodyLen, attrLen, totalDataLen, msgMagic, BIN_MSG_MAGIC);
+ "Malformed msg, bodyLen(%d) + attrLen(%d) >
totalDataLen(%d)",
+ bodyLen, attrLen, totalDataLen);
if (logger.isDebugEnabled()) {
logger.debug(errMsg + ", channel {}", channel);
}
@@ -643,7 +657,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
if (channel == null || !channel.isWritable()) {
// release allocated ByteBuf
binBuffer.release();
-
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Send msg but channel full, attr={}, channel={}",
orgAttr, channel);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
index 74cc4435c0..7bae8c2852 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleHttpSource.java
@@ -76,6 +76,11 @@ public class SimpleHttpSource extends BaseSource implements
Configurable {
new DefaultThreadFactory(this.getName() + "-worker-group"));
// init boostrap
bootstrap = new ServerBootstrap();
+ if (conLinger >= 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+ }
+ bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
bootstrap.childOption(ChannelOption.ALLOCATOR,
ByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
bootstrap.childOption(ChannelOption.SO_RCVBUF, maxRcvBufferSize);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
index acda3cf4f7..ef298ce805 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleTcpSource.java
@@ -83,6 +83,11 @@ public class SimpleTcpSource extends BaseSource implements
Configurable {
new DefaultThreadFactory(this.getName() + "-worker-group"));
// init boostrap
bootstrap = new ServerBootstrap();
+ if (conLinger >= 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+ }
+ bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
bootstrap.childOption(ChannelOption.ALLOCATOR,
ByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
index f3ea695c56..7010c4d933 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SimpleUdpSource.java
@@ -53,6 +53,11 @@ public class SimpleUdpSource extends BaseSource implements
Configurable {
logger.info("start " + this.getName());
bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
+ if (conLinger >= 0) {
+ bootstrap.option(ChannelOption.SO_LINGER, conLinger);
+ }
+ bootstrap.option(ChannelOption.SO_BACKLOG, conBacklog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
bootstrap.option(ChannelOption.SO_RCVBUF, maxRcvBufferSize);
bootstrap.option(ChannelOption.SO_SNDBUF, maxSendBufferSize);
bootstrap.handler(this.getChannelInitializerFactory());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 26f1fad6cb..8cb0e018ce 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -73,6 +73,16 @@ public class SourceConstants {
public static final String SRCCXT_SEND_BUFFER_SIZE = "sendBufferSize";
public static final int VAL_DEF_SEND_BUFFER_SIZE = 64 * 1024;
public static final int VAL_MIN_SEND_BUFFER_SIZE = 0;
+ // connect backlog
+ public static final String SRCCXT_CONN_BACKLOG = "con-backlog";
+ public static final int VAL_DEF_CONN_BACKLOG = 128;
+ public static final int VAL_MIN_CONN_BACKLOG = 0;
+ // connect linger
+ public static final String SRCCXT_CONN_LINGER = "con-linger";
+ public static final int VAL_MIN_CONN_LINGER = 0;
+ // connect reuse address
+ public static final String SRCCXT_REUSE_ADDRESS = "reuse-address";
+ public static final boolean VAL_DEF_REUSE_ADDRESS = true;
// tcp parameter no delay
public static final String SRCCXT_TCP_NO_DELAY = "tcpNoDelay";
public static final boolean VAL_DEF_TCP_NO_DELAY = true;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 4f636f8173..fd016f4857 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -94,25 +94,25 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
final String clientIp = AddressUtils.getChannelRemoteIP(ctx.channel());
// check request decode result
if (!req.decoderResult().isSuccess()) {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_DECODE_FAIL);
+ source.fileMetricIncSumStats(StatConstants.EVENT_MSG_DECODE_FAIL);
sendErrorMsg(ctx, DataProxyErrCode.HTTP_DECODE_REQ_FAILURE);
return;
}
// check service status.
if (source.isRejectService()) {
- source.fileMetricEventInc(StatConstants.EVENT_SERVICE_CLOSED);
+ source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_CLOSED);
sendErrorMsg(ctx, DataProxyErrCode.SERVICE_CLOSED);
return;
}
// check sink service status
if (!ConfigManager.getInstance().isMqClusterReady()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_SERVICE_SINK_UNREADY);
sendErrorMsg(ctx, DataProxyErrCode.SINK_SERVICE_UNREADY);
return;
}
// check request method
if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST)
{
- source.fileMetricEventInc(StatConstants.EVENT_MSG_METHOD_INVALID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_METHOD_INVALID);
sendErrorMsg(ctx, DataProxyErrCode.HTTP_UNSUPPORTED_METHOD,
"Only support [" + HttpMethod.GET.name() + ", "
+ HttpMethod.POST.name() + "] methods");
@@ -125,7 +125,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
if (!HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())
&&
!HttpAttrConst.KEY_SRV_URL_REPORT_MSG.equals(uriDecoder.path())) {
if (!HttpAttrConst.KEY_URL_FAVICON_ICON.equals(uriDecoder.path()))
{
-
source.fileMetricEventInc(StatConstants.EVENT_MSG_PATH_INVALID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_PATH_INVALID);
sendErrorMsg(ctx,
DataProxyErrCode.HTTP_UNSUPPORTED_SERVICE_URI,
"Only support [" + HttpAttrConst.KEY_SRV_URL_HEARTBEAT
+ ", "
+ HttpAttrConst.KEY_SRV_URL_REPORT_MSG + "]
paths!");
@@ -136,7 +136,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
boolean closeConnection = isCloseConnection(req);
// process hb service
if (HttpAttrConst.KEY_SRV_URL_HEARTBEAT.equals(uriDecoder.path())) {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_HB_SUCCESS);
+ source.fileMetricIncSumStats(StatConstants.EVENT_MSG_HB_SUCCESS);
sendResponse(ctx, closeConnection);
return;
}
@@ -152,7 +152,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
cntType = cntType.trim();
if (!cntType.equalsIgnoreCase(
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) {
-
source.fileMetricEventInc(StatConstants.EVENT_MSG_CONTYPE_INVALID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_CONTYPE_INVALID);
sendErrorMsg(ctx,
DataProxyErrCode.HTTP_UNSUPPORTED_CONTENT_TYPE,
"Only support [" +
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED
+ "] content type!");
@@ -175,13 +175,13 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKIN);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKIN);
// check illegal ip
if (ConfigManager.getInstance().needChkIllegalIP()) {
String strRemoteIp =
AddressUtils.getChannelRemoteIP(ctx.channel());
if (strRemoteIp != null
&& ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_ILLEGAL);
+
source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_ILLEGAL);
ctx.channel().disconnect();
ctx.channel().close();
if (logCounter.shouldPrint()) {
@@ -192,7 +192,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
}
// check max allowed connection count
if (source.getAllChannels().size() >= source.getMaxConnections()) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_OVERMAX);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_OVERMAX);
ctx.channel().disconnect();
ctx.channel().close();
if (logCounter.shouldPrint()) {
@@ -208,18 +208,18 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
@Override
public void channelInactive(ChannelHandlerContext ctx) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_LINKOUT);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_LINKOUT);
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- source.fileMetricEventInc(StatConstants.EVENT_VISIT_EXCEPTION);
+ source.fileMetricIncSumStats(StatConstants.EVENT_VISIT_EXCEPTION);
+ if (logCounter.shouldPrint()) {
+ logger.warn("{} received an exception from channel {}",
+ source.getName(), ctx.channel(), cause);
+ }
if (cause instanceof IOException) {
- if (logCounter.shouldPrint()) {
- logger.warn("{} received an IOException from channel {}",
- source.getName(), ctx.channel(), cause);
- }
ctx.close();
} else {
sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
@@ -250,7 +250,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
StringBuilder strBuff = new StringBuilder(512);
String groupId = reqAttrs.get(HttpAttrConst.KEY_GROUP_ID);
if (StringUtils.isBlank(groupId)) {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_GROUPID_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_GROUP_ID)
.append(" must exist and not blank!").toString(),
@@ -260,7 +260,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
// get and check streamId
String streamId = reqAttrs.get(HttpAttrConst.KEY_STREAM_ID);
if (StringUtils.isBlank(streamId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_MSG_STREAMID_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMID_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_STREAMID_ARGUMENT.getErrCode(),
strBuff.append("Field
").append(HttpAttrConst.KEY_STREAM_ID)
.append(" must exist and not blank!").toString(),
@@ -270,9 +270,9 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
// get and check topicName
String topicName = ConfigManager.getInstance().getTopicName(groupId,
streamId);
if (StringUtils.isBlank(topicName)) {
-
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
sendResponse(ctx, DataProxyErrCode.TOPIC_IS_BLANK.getErrCode(),
- strBuff.append("Topic not configured for
").append(HttpAttrConst.KEY_STREAM_ID)
+ strBuff.append("Topic not configured for
").append(HttpAttrConst.KEY_GROUP_ID)
.append("(").append(groupId).append("),")
.append(HttpAttrConst.KEY_STREAM_ID)
.append("(,").append(streamId).append(")").toString(),
@@ -293,13 +293,13 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
String body = reqAttrs.get(HttpAttrConst.KEY_BODY);
if (StringUtils.isBlank(body)) {
if (body == null) {
-
source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_MISSING);
sendResponse(ctx,
DataProxyErrCode.MISS_REQUIRED_BODY_ARGUMENT.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is not exist!").toString(),
isCloseCon);
} else {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_BLANK);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_BLANK);
sendResponse(ctx, DataProxyErrCode.EMPTY_MSG.getErrCode(),
strBuff.append("Field ").append(HttpAttrConst.KEY_BODY)
.append(" is Blank!").toString(),
@@ -308,7 +308,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
return false;
}
if (body.length() > source.getMaxMsgLength()) {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_BODY_OVERMAX);
+ source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_OVERMAX);
sendResponse(ctx,
DataProxyErrCode.BODY_EXCEED_MAX_LEN.getErrCode(),
strBuff.append("Error msg, the
").append(HttpAttrConst.KEY_BODY)
.append(" length(").append(body.length())
@@ -348,28 +348,27 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
// build metric data item
dataTime = dataTime / 1000 / 60 / 10;
dataTime = dataTime * 1000 * 60 * 10;
- strBuff.append(source.getProtocolName())
- .append(AttrConstants.SEP_HASHTAG).append(topicName)
- .append(AttrConstants.SEP_HASHTAG).append(streamId)
- .append(AttrConstants.SEP_HASHTAG).append(clientIp)
- .append(AttrConstants.SEP_HASHTAG).append(source.getSrcHost())
- .append(AttrConstants.SEP_HASHTAG).append("b2b")
-
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime))
-
.append(AttrConstants.SEP_HASHTAG).append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime));
+ String statsKey =
strBuff.append(source.getProtocolName()).append(AttrConstants.SEP_HASHTAG)
+ .append(groupId).append(AttrConstants.SEP_HASHTAG)
+ .append(streamId).append(AttrConstants.SEP_HASHTAG)
+ .append(clientIp).append(AttrConstants.SEP_HASHTAG)
+ .append(source.getSrcHost()).append(AttrConstants.SEP_HASHTAG)
+ .append("b2b").append(AttrConstants.SEP_HASHTAG)
+
.append(DateTimeUtils.ms2yyyyMMddHHmm(dataTime)).append(AttrConstants.SEP_HASHTAG)
+ .append(DateTimeUtils.ms2yyyyMMddHHmm(msgRcvTime)).toString();
+ strBuff.delete(0, strBuff.length());
try {
source.getChannelProcessor().processEvent(event);
- source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_SUCCESS);
- source.fileMetricRecordAdd(strBuff.toString(), intMsgCnt, 1,
event.getBody().length, 0);
- strBuff.delete(0, strBuff.length());
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_SUCCESS);
+ source.fileMetricAddSuccCnt(statsKey, intMsgCnt, 1,
event.getBody().length);
source.addMetric(true, event.getBody().length, event);
sendResponse(ctx, isCloseCon);
return true;
} catch (Throwable ex) {
- source.fileMetricEventInc(StatConstants.EVENT_MSG_POST_FAILURE);
- source.fileMetricRecordAdd(strBuff.toString(), 0, 0, 0, intMsgCnt);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
+ source.fileMetricAddFailCnt(statsKey, intMsgCnt);
source.addMetric(false, event.getBody().length, event);
- strBuff.delete(0, strBuff.length());
- sendErrorMsg(ctx, DataProxyErrCode.UNKNOWN_ERROR,
+ sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString());
if (logCounter.shouldPrint()) {
logger.error("Error writing HTTP event to channel failure.",
ex);
@@ -422,7 +421,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
return;
}
if (!ctx.channel().isWritable()) {
-
source.fileMetricEventInc(StatConstants.EVENT_CHANNEL_NOT_WRITABLE);
+
source.fileMetricIncSumStats(StatConstants.EVENT_REMOTE_UNWRITABLE);
if (logCounter.shouldPrint()) {
logger.warn("Send msg but channel full, channel={}",
ctx.channel());
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
index b99d28cddc..69fee83565 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
@@ -61,6 +61,7 @@ public abstract class AbsV0MsgCodec {
protected String topicName;
protected String msgSeqId = "";
protected long uniq = -1L;
+ protected boolean isOrderOrProxy = false;
protected String msgProcType = "b2b";
protected boolean needResp = true;
@@ -90,6 +91,10 @@ public abstract class AbsV0MsgCodec {
return this.needResp;
}
+ public boolean isOrderOrProxy() {
+ return isOrderOrProxy;
+ }
+
public byte getMsgType() {
return this.msgType;
}
@@ -142,6 +147,11 @@ public abstract class AbsV0MsgCodec {
return msgRcvTime;
}
+ public void setSuccessInfo() {
+ this.errCode = DataProxyErrCode.SUCCESS;
+ this.errMsg = "";
+ }
+
public void setFailureInfo(DataProxyErrCode errCode) {
setFailureInfo(errCode, "");
}
@@ -168,7 +178,7 @@ public abstract class AbsV0MsgCodec {
try {
this.attrMap.putAll(mapSplitter.split(this.origAttr));
} catch (Exception e) {
- source.fileMetricEventInc(StatConstants.EVENT_INVALIDATTR);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_INVALID);
this.errCode = DataProxyErrCode.SPLIT_ATTR_ERROR;
this.errMsg = String.format("Parse attr (%s) failure",
this.origAttr);
return false;
@@ -178,6 +188,28 @@ public abstract class AbsV0MsgCodec {
if
("false".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_IS_ACK))) {
this.needResp = false;
}
+ // get whether sync send
+ if
("true".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_SYNC_SEND))) {
+ if (!this.needResp) {
+ this.needResp = true;
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ORDER_ACK_INVALID);
+ this.errCode =
DataProxyErrCode.ATTR_ORDER_CONTROL_CONFLICT_ERROR;
+ return false;
+ }
+ this.isOrderOrProxy = true;
+ this.msgProcType = "order";
+ }
+ // get whether proxy send
+ if
("true".equalsIgnoreCase(attrMap.get(AttributeConstants.MESSAGE_PROXY_SEND))) {
+ if (!this.needResp) {
+ this.needResp = true;
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_PROXY_ACK_INVALID);
+ this.errCode =
DataProxyErrCode.ATTR_PROXY_CONTROL_CONFLICT_ERROR;
+ return false;
+ }
+ this.isOrderOrProxy = true;
+ this.msgProcType = "proxy";
+ }
return true;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index 027e33e74f..4fe96f57ec 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -26,12 +26,10 @@ import
org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
-import org.apache.inlong.dataproxy.utils.MessageUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -63,7 +61,6 @@ public class CodecBinMsg extends AbsV0MsgCodec {
private long dataTimeSec;
private boolean num2name = false;
private boolean transNum2Name = false;
- private boolean isOrderOrProxy = false;
private boolean indexMsg = false;
private boolean fileCheckMsg = false;
private boolean needTraceMsg = false;
@@ -91,28 +88,32 @@ public class CodecBinMsg extends AbsV0MsgCodec {
+ bodyLen + BIN_MSG_ATTRLEN_SIZE + attrLen);
if (bodyLen <= 0) {
if (bodyLen == 0) {
- source.fileMetricEventInc(StatConstants.EVENT_NOBODY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_ZERO);
this.errCode = DataProxyErrCode.BODY_LENGTH_ZERO;
} else {
- source.fileMetricEventInc(StatConstants.EVENT_NEGBODY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_NEGATIVE);
this.errCode = DataProxyErrCode.BODY_LENGTH_LESS_ZERO;
}
return false;
}
// get attribute length
if (attrLen < 0) {
- source.fileMetricEventInc(StatConstants.EVENT_NEGATTR);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_NEGATIVE);
this.errCode = DataProxyErrCode.ATTR_LENGTH_LESS_ZERO;
return false;
}
// get msg magic
- if ((msgMagic != BIN_MSG_MAGIC)
- || (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen
+ BIN_MSG_FORMAT_SIZE))) {
- source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
- this.errCode = DataProxyErrCode.FIELD_VALUE_NOT_EQUAL;
- this.errMsg = String.format(
- "fixedLen(%d) + bodyLen(%d) + attrLen(%d) >
totalDataLen(%d) + 4 or msgMagic(%d) != %d",
- BIN_MSG_FORMAT_SIZE, bodyLen, attrLen, totalDataLen,
msgMagic, BIN_MSG_MAGIC);
+ if (msgMagic != BIN_MSG_MAGIC) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_MAGIC_UNEQUAL);
+ this.errCode = DataProxyErrCode.FIELD_MAGIC_NOT_EQUAL;
+ this.errMsg = String.format("magicInMsg(%d) != %d", msgMagic,
BIN_MSG_MAGIC);
+ return false;
+ }
+ if (totalDataLen + BIN_MSG_TOTALLEN_SIZE < (bodyLen + attrLen +
BIN_MSG_FORMAT_SIZE)) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BIN_LEN_MALFORMED);
+ this.errCode = DataProxyErrCode.FIELD_LENGTH_VALUE_NOT_EQUAL;
+ this.errMsg = String.format("fixedLen(%d) + bodyLen(%d) +
attrLen(%d) > totalDataLen(%d) + 4",
+ BIN_MSG_FORMAT_SIZE, bodyLen, attrLen, totalDataLen);
return false;
}
// extract attr bytes
@@ -133,19 +134,13 @@ public class CodecBinMsg extends AbsV0MsgCodec {
if (((extendField & 0x4) >> 2) == 0x0) {
this.num2name = true;
}
- // parse required fields
- Pair<Boolean, String> evenProcType =
-
MessageUtils.getEventProcType(attrMap.get(AttributeConstants.MESSAGE_SYNC_SEND),
- attrMap.get(AttributeConstants.MESSAGE_PROXY_SEND));
- this.isOrderOrProxy = evenProcType.getLeft();
- this.msgProcType = evenProcType.getRight();
return true;
}
public boolean validAndFillFields(BaseSource source, StringBuilder
strBuff) {
// reject unsupported index messages
if (indexMsg) {
- source.fileMetricEventInc(StatConstants.EVENT_UNSUPMSG);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_INDEXMSG_ILLEGAL);
this.errCode = DataProxyErrCode.UNSUPPORTED_EXTEND_FIELD_VALUE;
return false;
}
@@ -257,7 +252,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
this.streamId = this.attrMap.get(AttributeConstants.STREAM_ID);
if (num2name) {
if (this.groupIdNum == 0) {
- source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPIDNUM_ZERO);
this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
this.errMsg = "groupIdNum is 0 in message";
return false;
@@ -268,18 +263,18 @@ public class CodecBinMsg extends AbsV0MsgCodec {
confGroupId = configManager.getGroupIdNameByNum(strGroupIdNum);
if (StringUtils.isBlank(confGroupId)) {
if (configManager.isGroupIdNumConfigEmpty()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
this.errMsg = "GroupId-Mapping configuration is null";
} else {
-
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUPIDNUM_MISSING);
this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
this.errMsg = String.format("Non-existing groupIdNum(%s)
configuration", strGroupIdNum);
}
return false;
}
if (StringUtils.isNotBlank(this.groupId) &&
!this.groupId.equalsIgnoreCase(confGroupId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUP_IDNUM_INCONSTANT);
this.errCode = DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
this.errMsg = String.format(
"Inconstant GroupId not equal, (%s) in attr but (%s)
in configure by groupIdNum(%s)",
@@ -290,7 +285,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
// check streamId
if (this.streamIdNum == 0) {
if (StringUtils.isNotBlank(this.streamId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_STREAMIDNUM_ZERO);
this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
this.errMsg = String.format("Inconstant streamId(%s) in
attr but streamIdNum=0", this.streamId);
return false;
@@ -300,11 +295,11 @@ public class CodecBinMsg extends AbsV0MsgCodec {
confStreamId =
configManager.getStreamIdNameByIdNum(strGroupIdNum, strStreamIdNum);
if (StringUtils.isBlank(confStreamId)) {
if (configManager.isStreamIdNumConfigEmpty()) {
-
source.fileMetricEventInc(StatConstants.EVENT_SERVICE_SINK_UNREADY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_IDNUM_EMPTY);
this.errCode = DataProxyErrCode.CONF_SERVICE_UNREADY;
this.errMsg = "StreamId-Mapping configuration is null";
} else {
-
source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAMIDNUM_MISSING);
this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_NOT_CONFIGURE;
this.errMsg = String.format("Non-existing
GroupId(%s)-StreamId(%s) configuration",
strGroupIdNum, strStreamIdNum);
@@ -312,7 +307,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
return false;
}
if (StringUtils.isNotBlank(this.streamId) &&
!this.streamId.equalsIgnoreCase(confStreamId)) {
-
source.fileMetricEventInc(StatConstants.EVENT_INCONSGROUPORSTREAMID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAM_IDNUM_INCONSTANT);
this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
this.errMsg = String.format(
"Inconstant StreamId, (%s) in attr but (%s) in
configure by groupIdNum(%s), streamIdNum(%s)",
@@ -327,7 +322,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
}
} else {
if (StringUtils.isBlank(groupId)) {
- source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
return false;
}
@@ -338,9 +333,9 @@ public class CodecBinMsg extends AbsV0MsgCodec {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
this.topicName = source.getDefTopic();
} else {
-
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
- this.errMsg = String.format("Topic is null for
inlongGroupId=(%s), inlongStreamId=(%s)",
+ this.errMsg = String.format("Topic not configured for
groupId=(%s), streamId=(%s)",
this.groupId, this.streamId);
return false;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
index f02752ca10..50f7552a32 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecTextMsg.java
@@ -54,15 +54,16 @@ public class CodecTextMsg extends AbsV0MsgCodec {
int bodyLen = cb.getInt(msgHeadPos + TXT_MSG_BODYLEN_OFFSET);
if (bodyLen <= 0) {
if (bodyLen == 0) {
- source.fileMetricEventInc(StatConstants.EVENT_NOBODY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_ZERO);
this.errCode = DataProxyErrCode.BODY_LENGTH_ZERO;
} else {
- source.fileMetricEventInc(StatConstants.EVENT_NEGBODY);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_NEGATIVE);
this.errCode = DataProxyErrCode.BODY_LENGTH_LESS_ZERO;
}
return false;
}
if (bodyLen + TXT_MSG_FORMAT_SIZE > totalDataLen +
TXT_MSG_TOTALLEN_SIZE) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_LEN_MALFORMED);
this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
this.errMsg = String.format("Error msg, bodyLen(%d) +
fixedLength(%d) > totalDataLen(%d) + 4",
bodyLen, TXT_MSG_FORMAT_SIZE, totalDataLen);
@@ -74,11 +75,13 @@ public class CodecTextMsg extends AbsV0MsgCodec {
// get attribute length
int attrLen = cb.getInt(msgHeadPos + TXT_MSG_BODY_OFFSET + bodyLen);
if (attrLen < 0) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ATTR_NEGATIVE);
this.errCode = DataProxyErrCode.ATTR_LENGTH_LESS_ZERO;
return false;
}
// check attribute length
if (totalDataLen + TXT_MSG_TOTALLEN_SIZE != TXT_MSG_FORMAT_SIZE +
bodyLen + attrLen) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TXT_LEN_MALFORMED);
this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
this.errMsg = String.format(
"Error msg, totalDataLen(%d) + 4 != fixedLength(%d) +
bodyLen(%d) + attrLen(%d)",
@@ -97,14 +100,14 @@ public class CodecTextMsg extends AbsV0MsgCodec {
unCompressedData = new byte[uncompressedLen];
Snappy.uncompress(bodyData, 0, bodyData.length,
unCompressedData, 0);
} catch (IOException e) {
- source.fileMetricEventInc(StatConstants.EVENT_UNPRESSEXP);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_UNPRESS_EXP);
this.errCode = DataProxyErrCode.UNCOMPRESS_DATA_ERROR;
this.errMsg = String.format("Error to uncompress msg, compress
type(%s), attr: (%s), error: (%s)",
attrMap.get(AttributeConstants.COMPRESS_TYPE),
origAttr, e.getCause());
return false;
}
if (unCompressedData.length == 0) {
- source.fileMetricEventInc(StatConstants.EVENT_UNPRESSEXP);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_BODY_UNPRESS_EXP);
this.errCode = DataProxyErrCode.UNCOMPRESS_DATA_ERROR;
this.errMsg = String.format("Error to uncompress msg, compress
type(%s), attr: (%s), error: 2",
attrMap.get(AttributeConstants.COMPRESS_TYPE),
origAttr);
@@ -120,7 +123,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
while (bodyBuffer.remaining() > 0) {
singleMsgLen = bodyBuffer.getInt(readPos);
if (singleMsgLen <= 0 || singleMsgLen >
bodyBuffer.remaining()) {
- source.fileMetricEventInc(StatConstants.EVENT_MALFORMED);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
this.errMsg = String.format(
"Malformed data len, singleMsgLen(%d), buffer
remaining(%d), attr: (%s)",
@@ -138,7 +141,7 @@ public class CodecTextMsg extends AbsV0MsgCodec {
String tmpGroupId = attrMap.get(AttributeConstants.GROUP_ID);
String tmpStreamId = attrMap.get(AttributeConstants.STREAM_ID);
if (StringUtils.isBlank(tmpGroupId)) {
- source.fileMetricEventInc(StatConstants.EVENT_WITHOUTGROUPID);
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_GROUPID_MISSING);
this.errCode = DataProxyErrCode.MISS_REQUIRED_GROUPID_ARGUMENT;
return false;
}
@@ -148,10 +151,10 @@ public class CodecTextMsg extends AbsV0MsgCodec {
if (CommonConfigHolder.getInstance().isNoTopicAccept()) {
tmpTopicName = source.getDefTopic();
} else {
-
source.fileMetricEventInc(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
+
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_TOPIC_MISSING);
this.errCode = DataProxyErrCode.TOPIC_IS_BLANK;
this.errMsg = String.format(
- "Topic is null for inlongGroupId=(%s),
inlongStreamId=(%s)", tmpGroupId, tmpStreamId);
+ "Topic not configured for groupId=(%s),
streamId=(%s)", tmpGroupId, tmpStreamId);
return false;
}
}