This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec135b1aae [INLONG-8311][DataProxy] Add event handling support for
FlumeEvent type (#8312)
ec135b1aae is described below
commit ec135b1aae982020626ee3f8a02c7641f4a6c08a
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Jun 25 18:24:42 2023 +0800
[INLONG-8311][DataProxy] Add event handling support for FlumeEvent type
(#8312)
---
.../channel/FailoverChannelProcessor.java | 4 ++-
.../inlong/dataproxy/config/ConfigManager.java | 7 ++--
.../inlong/dataproxy/consts/StatConstants.java | 3 ++
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 31 +++++++++++++++-
.../inlong/sdk/commons/protocol/ProxyEvent.java | 41 +++++++++++++++++-----
5 files changed, 73 insertions(+), 13 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
index 2162019317..ce60078b67 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/channel/FailoverChannelProcessor.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.MainChannelFullException;
import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -277,7 +278,8 @@ public class FailoverChannelProcessor
}
}
if (!success) {
- if (MessageUtils.isSyncSendForOrder(event)) {
+ if (MessageUtils.isSyncSendForOrder(event)
+ || event instanceof ProxyPackEvent) {
throw new MainChannelFullException(errMsg);
}
List<Channel> optionalChannels =
selector.getOptionalChannels(event);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index ecd212fc09..e37a1e8925 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -347,12 +347,13 @@ public class ConfigManager {
}
httpPost.setEntity(HttpUtils.getEntity(request));
// request with post
- LOG.info("Start to request {} to get config info, with params
{}", url, request);
+ LOG.info("Start to request {} to get config info, with params:
{}, headers: {}",
+ url, request, httpPost.getAllHeaders());
CloseableHttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != 200) {
- LOG.warn("Failed to request {}, with params {}, the
response is {}",
- url, request, returnStr);
+ LOG.warn("Failed to request {}, with params: {}, headers:
{}, the response is {}",
+ url, request, httpPost.getAllHeaders(), returnStr);
return false;
}
LOG.info("End to request {} to get config info:{}", url,
returnStr);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 45126cc55b..9ffe88ffff 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -78,6 +78,9 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_V1_POST_SUCCESS =
"msg.post.v1.success";
public static final java.lang.String EVENT_MSG_V1_POST_DROPPED =
"msg.post.v1.dropped";
// sink
+ public static final java.lang.String EVENT_SINK_EVENT_V1_MALFORMED =
"sink.event.v1.malformed";
+ public static final java.lang.String EVENT_SINK_EVENT_V1_FILE =
"sink.event.v1.file";
+ public static final java.lang.String EVENT_SINK_EVENT_V0_FILE =
"sink.event.v1.file";
public static final java.lang.String EVENT_SINK_CONFIG_TOPIC_MISSING =
"sink.topic.missing";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_MISSING =
"default.topic.empty";
public static final java.lang.String EVENT_SINK_DEFAULT_TOPIC_USED =
"default.topic.used";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index aee7d09c8f..6edb9f5bc0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -21,10 +21,14 @@ import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.utils.BufferQueue;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -209,8 +213,33 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
tx.commit();
return Status.READY;
}
+ // file event
+ if
(StringUtils.isEmpty(event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER))) {
+ String groupId =
event.getHeaders().get(EventConstants.INLONG_GROUP_ID);
+ String streamId =
event.getHeaders().get(EventConstants.INLONG_STREAM_ID);
+ String msgTimeStr =
event.getHeaders().get(EventConstants.HEADER_KEY_MSG_TIME);
+ String sourceIp =
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_IP);
+ String sourceTimeStr =
event.getHeaders().get(EventConstants.HEADER_KEY_SOURCE_TIME);
+ if (groupId != null
+ && streamId != null
+ && msgTimeStr != null
+ && sourceIp != null
+ && sourceTimeStr != null) {
+ ProxyEvent proxyEvent = new ProxyEvent(groupId, streamId,
msgTimeStr,
+ sourceIp, sourceTimeStr, event.getHeaders(),
event.getBody());
+ this.dispatchManager.addEvent(proxyEvent);
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_FILE);
+ } else {
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V1_MALFORMED);
+ }
+ } else {
+ SimpleEvent simpleEvent = new SimpleEvent();
+ simpleEvent.setBody(event.getBody());
+ simpleEvent.setHeaders(event.getHeaders());
+ this.dispatchManager.addSimpleEvent(simpleEvent);
+
context.fileMetricIncSumStats(StatConstants.EVENT_SINK_EVENT_V0_FILE);
+ }
tx.commit();
- this.context.addSendFailMetric();
return Status.READY;
} catch (Throwable t) {
if (logCounter.shouldPrint()) {
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index f235c40293..c76ab8974c 100644
---
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.commons.protocol;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+import org.apache.commons.lang3.math.NumberUtils;
+
import java.util.Map;
/**
@@ -39,11 +41,11 @@ public class ProxyEvent extends SdkEvent {
/**
* Constructor
*
- * @param inlongGroupId
- * @param inlongStreamId
- * @param body
- * @param msgTime
- * @param sourceIp
+ * @param inlongGroupId the group id
+ * @param inlongStreamId the stream id
+ * @param body the body content
+ * @param msgTime the message time
+ * @param sourceIp the source ip
*/
public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[]
body, long msgTime, String sourceIp) {
this.inlongGroupId = inlongGroupId;
@@ -65,9 +67,9 @@ public class ProxyEvent extends SdkEvent {
/**
* Constructor
*
- * @param inlongGroupId
- * @param inlongStreamId
- * @param obj
+ * @param inlongGroupId the group id
+ * @param inlongStreamId the stream id
+ * @param obj the pb message object
*/
public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj
obj) {
this.inlongGroupId = inlongGroupId;
@@ -86,6 +88,29 @@ public class ProxyEvent extends SdkEvent {
this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME,
String.valueOf(sourceTime));
}
+ /**
+ * ReBuild ProxyEvent object
+ *
+ * @param groupId the group id
+ * @param streamId the stream id
+ * @param msgTimeStr the message time
+ * @param sourceIp the source ip
+ * @param sourceTimeStr the source time
+ * @param headers the rebuild headers, include required headers
+ * @param body the rebuild body
+ */
+ public ProxyEvent(String groupId, String streamId, String msgTimeStr,
String sourceIp,
+ String sourceTimeStr, Map<String, String> headers, byte[] body) {
+ this.inlongGroupId = groupId;
+ this.inlongStreamId = streamId;
+ this.sourceIp = sourceIp;
+ this.uid = InlongId.generateUid(this.inlongGroupId,
this.inlongStreamId);
+ this.msgTime = NumberUtils.toLong(msgTimeStr,
System.currentTimeMillis());
+ this.sourceTime = NumberUtils.toLong(sourceTimeStr,
System.currentTimeMillis());
+ super.setBody(body);
+ super.setHeaders(headers);
+ }
+
/**
* get sourceTime
*