This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec135b1aae [INLONG-8311][DataProxy] Add event handling support for 
FlumeEvent type (#8312)
ec135b1aae is described below

commit ec135b1aae982020626ee3f8a02c7641f4a6c08a
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Jun 25 18:24:42 2023 +0800

    [INLONG-8311][DataProxy] Add event handling support for FlumeEvent type 
(#8312)
---
 .../channel/FailoverChannelProcessor.java          |  4 ++-
 .../inlong/dataproxy/config/ConfigManager.java     |  7 ++--
 .../inlong/dataproxy/consts/StatConstants.java     |  3 ++
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    | 31 +++++++++++++++-
 .../inlong/sdk/commons/protocol/ProxyEvent.java    | 41 +++++++++++++++++-----
 5 files changed, 73 insertions(+), 13 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 2162019317..ce60078b67 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.exception.MainChannelFullException;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -277,7 +278,8 @@ public class FailoverChannelProcessor
             }
         }
         if (!success) {
-            if (MessageUtils.isSyncSendForOrder(event)) {
+            if (MessageUtils.isSyncSendForOrder(event)
+                    || event instanceof ProxyPackEvent) {
                 throw new MainChannelFullException(errMsg);
             }
             List<Channel> optionalChannels = 
selector.getOptionalChannels(event);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index ecd212fc09..e37a1e8925 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -347,12 +347,13 @@ public class ConfigManager {
                 }
                 httpPost.setEntity(HttpUtils.getEntity(request));
                 // request with post
-                LOG.info("Start to request {} to get config info, with params 
{}", url, request);
+                LOG.info("Start to request {} to get config info, with params: 
{}, headers: {}",
+                        url, request, httpPost.getAllHeaders());
                 CloseableHttpResponse response = httpClient.execute(httpPost);
                 String returnStr = EntityUtils.toString(response.getEntity());
                 if (response.getStatusLine().getStatusCode() != 200) {
-                    LOG.warn("Failed to request {}, with params {}, the 
response is {}",
-                            url, request, returnStr);
+                    LOG.warn("Failed to request {}, with params: {}, headers: 
{}, the response is {}",
+                            url, request, httpPost.getAllHeaders(), returnStr);
                     return false;
                 }
                 LOG.info("End to request {} to get config info:{}", url, 
returnStr);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 45126cc55b..9ffe88ffff 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -78,6 +78,9 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS = 
"msg.post.v1.success";
     public static final java.lang.String EVENT_MSG_V1_POST_DROPPED = 
"msg.post.v1.dropped";
     // sink
+    public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED = 
"sink.event.v1.malformed";
+    public static final java.lang.String EVENT_SINK_EVENT_V1_FILE = 
"sink.event.v1.file";
+    public static final java.lang.String EVENT_SINK_EVENT_V0_FILE = 
"sink.event.v1.file";
     public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING = 
"sink.topic.missing";
     public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING = 
"default.topic.empty";
     public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED = 
"default.topic.used";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index aee7d09c8f..6edb9f5bc0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -21,10 +21,14 @@ import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.dataproxy.config.CommonConfigHolder;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.StatConstants;
 import org.apache.inlong.dataproxy.utils.BufferQueue;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -209,8 +213,33 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
                 tx.commit();
                 return Status.READY;
             }
+            // file event
+            if 
(StringUtils.isEmpty(event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER))) {
+                String groupId = 
event.getHeaders().get(EventConstants.INLONG_GROUP_ID);
+                String streamId = 
event.getHeaders().get(EventConstants.INLONG_STREAM_ID);
+                String msgTimeStr = 
event.getHeaders().get(EventConstants.HEADER_KEY_MSG_TIME);
+                String sourceIp = 
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+                String sourceTimeStr = 
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_TIME);
+                if (groupId != null
+                        && streamId != null
+                        && msgTimeStr != null
+                        && sourceIp != null
+                        && sourceTimeStr != null) {
+                    ProxyEvent proxyEvent = new ProxyEvent(groupId, streamId, 
msgTimeStr,
+                            sourceIp, sourceTimeStr, event.getHeaders(), 
event.getBody());
+                    this.dispatchManager.addEvent(proxyEvent);
+                    
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_FILE);
+                } else {
+                    
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
+                }
+            } else {
+                SimpleEvent simpleEvent = new SimpleEvent();
+                simpleEvent.setBody(event.getBody());
+                simpleEvent.setHeaders(event.getHeaders());
+                this.dispatchManager.addSimpleEvent(simpleEvent);
+                
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V0_FILE);
+            }
             tx.commit();
-            this.context.addSendFailMetric();
             return Status.READY;
         } catch (Throwable t) {
             if (logCounter.shouldPrint()) {
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index f235c40293..c76ab8974c 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.commons.protocol;
 
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 
+import org.apache.commons.lang3.math.NumberUtils;
+
 import java.util.Map;
 
 /**
@@ -39,11 +41,11 @@ public class ProxyEvent extends SdkEvent {
     /**
      * Constructor
      * 
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param body
-     * @param msgTime
-     * @param sourceIp
+     * @param inlongGroupId the group id
+     * @param inlongStreamId the stream id
+     * @param body the body content
+     * @param msgTime the message time
+     * @param sourceIp the source ip
      */
     public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[] 
body, long msgTime, String sourceIp) {
         this.inlongGroupId = inlongGroupId;
@@ -65,9 +67,9 @@ public class ProxyEvent extends SdkEvent {
     /**
      * Constructor
      * 
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param obj
+     * @param inlongGroupId the group id
+     * @param inlongStreamId the stream id
+     * @param obj  the pb message object
      */
     public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj 
obj) {
         this.inlongGroupId = inlongGroupId;
@@ -86,6 +88,29 @@ public class ProxyEvent extends SdkEvent {
         this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, 
String.valueOf(sourceTime));
     }
 
+    /**
+     * ReBuild ProxyEvent object
+     *
+     * @param groupId the group id
+     * @param streamId the stream id
+     * @param msgTimeStr the message time
+     * @param sourceIp the source ip
+     * @param sourceTimeStr the source time
+     * @param headers the rebuild headers, include required headers
+     * @param body the rebuild body
+     */
+    public ProxyEvent(String groupId, String streamId, String msgTimeStr, 
String sourceIp,
+            String sourceTimeStr, Map<String, String> headers, byte[] body) {
+        this.inlongGroupId = groupId;
+        this.inlongStreamId = streamId;
+        this.sourceIp = sourceIp;
+        this.uid = InlongId.generateUid(this.inlongGroupId, 
this.inlongStreamId);
+        this.msgTime = NumberUtils.toLong(msgTimeStr, 
System.currentTimeMillis());
+        this.sourceTime = NumberUtils.toLong(sourceTimeStr, 
System.currentTimeMillis());
+        super.setBody(body);
+        super.setHeaders(headers);
+    }
+
     /**
      * get sourceTime
      * 

Reply via email to