This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new a08a0b28f2 [INLONG-8576][DataProxy] Adjust handling when messages are
incomplete (#8577)
a08a0b28f2 is described below
commit a08a0b28f2bab8c9a91c00447325b3ffd79ffe5d
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jul 19 17:41:10 2023 +0800
[INLONG-8576][DataProxy] Adjust handling when messages are incomplete
(#8577)
---
.../main/java/org/apache/inlong/dataproxy/consts/StatConstants.java | 2 ++
.../java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java | 2 ++
.../java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java | 3 +--
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index f7cc229fc9..af6eb2b0d6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -79,6 +79,8 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_V1_POST_DROPPED =
"msg.post.v1.dropped";
// sink
public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED =
"sink.event.v1.malformed";
+ public static final java.lang.String EVENT_SINK_EVENT_TAKE_SUCCESS =
"sink.event.take.success";
+ public static final java.lang.String EVENT_SINK_EVENT_TAKE_FAILURE =
"sink.event.take.failure";
public static final java.lang.String EVENT_SINK_EVENT_V1_FILE =
"sink.event.v1.file";
public static final java.lang.String EVENT_SINK_EVENT_V0_FILE =
"sink.event.v1.file";
public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING =
"sink.topic.missing";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index d287c2df59..c6f42ef5bc 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -195,6 +195,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
tx.commit();
return Status.BACKOFF;
}
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_SUCCESS);
// ProxyEvent
if (event instanceof ProxyEvent) {
ProxyEvent proxyEvent = (ProxyEvent) event;
@@ -245,6 +246,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
tx.commit();
return Status.READY;
} catch (Throwable t) {
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
if (logCounter.shouldPrint()) {
logger.error("{} process event failed!", this.getName(), t);
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 41690d4f69..fc8e15d048 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -115,8 +115,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
// reset index when buffer is not satisfied.
cb.resetReaderIndex();
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED);
- throw new Exception("Error msg, buffer is unfilled,
readableLength="
- + readableLength + ", totalPackLength=" + totalDataLen
+ " + 4");
+ return;
}
// read type
int msgTypeValue = cb.readByte();