This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new a08a0b28f2 [INLONG-8576][DataProxy] Adjust handling when messages are 
incomplete (#8577)
a08a0b28f2 is described below

commit a08a0b28f2bab8c9a91c00447325b3ffd79ffe5d
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jul 19 17:41:10 2023 +0800

    [INLONG-8576][DataProxy] Adjust handling when messages are incomplete 
(#8577)
---
 .../main/java/org/apache/inlong/dataproxy/consts/StatConstants.java    | 2 ++
 .../java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java | 2 ++
 .../java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java  | 3 +--
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index f7cc229fc9..af6eb2b0d6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -79,6 +79,8 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_V1_POST_DROPPED = 
"msg.post.v1.dropped";
     // sink
     public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED = 
"sink.event.v1.malformed";
+    public static final java.lang.String EVENT_SINK_EVENT_TAKE_SUCCESS = 
"sink.event.take.success";
+    public static final java.lang.String EVENT_SINK_EVENT_TAKE_FAILURE = 
"sink.event.take.failure";
     public static final java.lang.String EVENT_SINK_EVENT_V1_FILE = 
"sink.event.v1.file";
     public static final java.lang.String EVENT_SINK_EVENT_V0_FILE = 
"sink.event.v1.file";
     public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING = 
"sink.topic.missing";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index d287c2df59..c6f42ef5bc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -195,6 +195,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                 tx.commit();
                 return Status.BACKOFF;
             }
+            
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_SUCCESS);
             // ProxyEvent
             if (event instanceof ProxyEvent) {
                 ProxyEvent proxyEvent = (ProxyEvent) event;
@@ -245,6 +246,7 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
             tx.commit();
             return Status.READY;
         } catch (Throwable t) {
+            
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_TAKE_FAILURE);
             if (logCounter.shouldPrint()) {
                 logger.error("{} process event failed!", this.getName(), t);
             }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 41690d4f69..fc8e15d048 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -115,8 +115,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                 // reset index when buffer is not satisfied.
                 cb.resetReaderIndex();
                 
source.fileMetricIncSumStats(StatConstants.EVENT_PKG_READABLE_UNFILLED);
-                throw new Exception("Error msg, buffer is unfilled, 
readableLength="
-                        + readableLength + ", totalPackLength=" + totalDataLen 
+ " + 4");
+                return;
             }
             // read type
             int msgTypeValue = cb.readByte();

Reply via email to