This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32442302c [INLONG-6848][DataProxy] Adapt original InlongMsg protocol 
and headers (#6849)
32442302c is described below

commit 32442302c2dc1845621d78efcb12a5cb3ba14ba0
Author: woofyzhao <[email protected]>
AuthorDate: Tue Dec 13 11:50:34 2022 +0800

    [INLONG-6848][DataProxy] Adapt original InlongMsg protocol and headers 
(#6849)
---
 inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh | 17 +++++++++--------
 .../inlong/dataproxy/sink/mq/BatchPackManager.java    | 14 +++++++++++++-
 .../dataproxy/sink/mq/MessageQueueZoneSink.java       |  8 ++++++++
 .../dataproxy/sink/mq/SimpleBatchPackProfileV0.java   | 19 ++++++++++++++++++-
 .../inlong/dataproxy/sink/mq/kafka/KafkaHandler.java  |  8 ++++++--
 .../dataproxy/sink/mq/pulsar/PulsarHandler.java       |  8 ++++++--
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java    |  6 +++++-
 .../inlong/dataproxy/source/ServerMessageHandler.java |  5 ++---
 8 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh 
b/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
index 8779c0a6d..3aa477185 100644
--- a/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
+++ b/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
@@ -23,7 +23,14 @@ local_ip=$(ifconfig $ETH_NAME | grep inet | grep -v inet6 | 
grep -v "127.0.0.1"
 # config
 cd "${file_path}/"
 common_conf_file=./conf/common.properties
-mq_conf_file=./conf/dataproxy-${MQ_TYPE}.conf
+if [ "${MQ_TYPE}" == "pulsar" ] || [ "${MQ_TYPE}" == "kafka" ]; then
+  mq_conf_file=./conf/dataproxy.conf
+elif [ "${MQ_TYPE}" == "tubemq" ]; then
+  mq_conf_file=./conf/dataproxy-${MQ_TYPE}.conf
+else
+  echo "MQ_TYPE must be one of pulsar/kafka/tubemq !"
+  exit 1
+fi
 
 sed -i 
"s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g"
 "${common_conf_file}"
 sed -i "s/audit.enable=.*$/audit.enable=${AUDIT_ENABLE}/g" 
"${common_conf_file}"
@@ -33,13 +40,7 @@ sed -i 
"s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
 sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g" 
"${common_conf_file}"
 sed -i 
"s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g" 
"${common_conf_file}"
 
-# start
-if [ "${MQ_TYPE}" = "pulsar" ]; then
-  bash +x ./bin/dataproxy-start.sh pulsar
-fi
-if [ "${MQ_TYPE}" = "tubemq" ]; then
-  bash +x ./bin/dataproxy-start.sh tubemq
-fi
+ bash +x ./bin/dataproxy-start.sh "${MQ_TYPE}"
 
 sleep 3
 # keep alive
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index fc99b3c05..3d110a2bd 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.dataproxy.sink.mq;
 
 import org.apache.flume.Context;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.inlong.dataproxy.utils.BufferQueue;
 import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -70,7 +71,6 @@ public class BatchPackManager {
 
     /**
      * addEvent
-     * 
      * @param event
      */
     public void addEvent(ProxyEvent event) {
@@ -135,6 +135,18 @@ public class BatchPackManager {
         }
     }
 
+    /**
+     * addSimpleEvent
+     * @param event
+     */
+    public void addSimpleEvent(SimpleEvent event) {
+        BatchPackProfile dispatchProfile = 
SimpleBatchPackProfileV0.create(event);
+        this.dispatchQueue.acquire(dispatchProfile.getSize());
+        this.dispatchQueue.offer(dispatchProfile);
+        outCounter.addAndGet(dispatchProfile.getCount());
+        inCounter.incrementAndGet();
+    }
+
     /**
      * outputOvertimeData
      * 
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index d261e7dcb..bc4915524 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -23,6 +23,7 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.dataproxy.sink.common.SinkContext;
 import org.apache.inlong.dataproxy.utils.BufferQueue;
@@ -149,6 +150,13 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable {
                 tx.commit();
                 return Status.READY;
             }
+            // SimpleEvent, send as is
+            if (event instanceof SimpleEvent) {
+                SimpleEvent simpleEvent = (SimpleEvent) event;
+                this.dispatchManager.addSimpleEvent(simpleEvent);
+                tx.commit();
+                return Status.READY;
+            }
             tx.commit();
             this.context.addSendFailMetric();
             return Status.READY;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
index f27eef62d..1c7d61704 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
@@ -17,9 +17,12 @@
 
 package org.apache.inlong.dataproxy.sink.mq;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
 import org.apache.inlong.sdk.commons.protocol.InlongId;
 
 import java.util.Map;
@@ -31,6 +34,7 @@ import java.util.Map;
 public class SimpleBatchPackProfileV0 extends BatchPackProfile {
 
     private Event simpleProfile;
+    private Map<String, String> properties;
 
     /**
      * Constructor
@@ -50,7 +54,7 @@ public class SimpleBatchPackProfileV0 extends 
BatchPackProfile {
      */
     public static SimpleBatchPackProfileV0 create(Event event) {
         Map<String, String> headers = event.getHeaders();
-        String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);;
+        String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);
         String inlongStreamId = headers.get(AttributeConstants.STREAM_ID);
         String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         long msgTime = 
NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME), 
System.currentTimeMillis());
@@ -60,6 +64,11 @@ public class SimpleBatchPackProfileV0 extends 
BatchPackProfile {
         profile.setCount(1);
         profile.setSize(event.getBody().length);
         profile.simpleProfile = event;
+
+        String pkgVersion = 
event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
+        if (StringUtils.isNotBlank(pkgVersion)) {
+            profile.properties = MessageUtils.getXfsAttrs(headers, pkgVersion);
+        }
         return profile;
     }
 
@@ -70,4 +79,12 @@ public class SimpleBatchPackProfileV0 extends 
BatchPackProfile {
     public Event getSimpleProfile() {
         return simpleProfile;
     }
+
+    /**
+     * get properties
+     * @return the properties
+     */
+    public Map<String, String> getProperties() {
+        return properties;
+    }
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index c0cde55d8..52e9c0816 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.mq.kafka;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.common.constant.Constants;
@@ -204,8 +205,11 @@ public class KafkaHandler implements MessageQueueHandler {
     private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event, 
IdTopicConfig idConfig,
             String topic) throws Exception {
         // headers
-        Map<String, String> headers = event.getSimpleProfile().getHeaders();
-        // compress
+        Map<String, String> headers = event.getProperties();
+        if (MapUtils.isEmpty(headers)) {
+            headers = event.getSimpleProfile().getHeaders();
+        }
+        // body
         byte[] bodyBytes = event.getSimpleProfile().getBody();
         // sendAsync
         long sendTime = System.currentTimeMillis();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index f34e195ab..105b9502c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.mq.pulsar;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
@@ -325,8 +326,11 @@ public class PulsarHandler implements MessageQueueHandler {
             Producer<byte[]> producer,
             String producerTopic) throws Exception {
         // headers
-        Map<String, String> headers = event.getSimpleProfile().getHeaders();
-        // compress
+        Map<String, String> headers = event.getProperties();
+        if (MapUtils.isEmpty(headers)) {
+            headers = event.getSimpleProfile().getHeaders();
+        }
+        // body
         byte[] bodyBytes = event.getSimpleProfile().getBody();
         // sendAsync
         long sendTime = System.currentTimeMillis();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 225c488c8..9bb599c41 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.mq.tube;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -239,7 +240,10 @@ public class TubeHandler implements MessageQueueHandler {
     private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event, 
IdTopicConfig idConfig,
             String topic) throws Exception {
         // headers
-        Map<String, String> headers = event.getSimpleProfile().getHeaders();
+        Map<String, String> headers = event.getProperties();
+        if (MapUtils.isEmpty(headers)) {
+            headers = event.getSimpleProfile().getHeaders();
+        }
         // compress
         byte[] bodyBytes = event.getSimpleProfile().getBody();
         // sendAsync
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 920268af1..3ab666e77 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.event.EventBuilder;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -56,7 +57,6 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -556,8 +556,7 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                     strBuff.delete(0, strBuff.length());
                 }
                 final byte[] data = inLongMsg.buildArray();
-                Event event = new ProxyEvent(groupId, streamIdEntry.getKey(), 
data,
-                        Long.parseLong(strDataTime), strRemoteIP);
+                Event event = EventBuilder.withBody(data, headers);
                 event.getHeaders().putAll(headers);
                 inLongMsg.reset();
                 Pair<Boolean, String> evenProcType =

Reply via email to