This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 32442302c [INLONG-6848][DataProxy] Adapt original InlongMsg protocol
and headers (#6849)
32442302c is described below
commit 32442302c2dc1845621d78efcb12a5cb3ba14ba0
Author: woofyzhao <[email protected]>
AuthorDate: Tue Dec 13 11:50:34 2022 +0800
[INLONG-6848][DataProxy] Adapt original InlongMsg protocol and headers
(#6849)
---
inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh | 17 +++++++++--------
.../inlong/dataproxy/sink/mq/BatchPackManager.java | 14 +++++++++++++-
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 8 ++++++++
.../dataproxy/sink/mq/SimpleBatchPackProfileV0.java | 19 ++++++++++++++++++-
.../inlong/dataproxy/sink/mq/kafka/KafkaHandler.java | 8 ++++++--
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 8 ++++++--
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 6 +++++-
.../inlong/dataproxy/source/ServerMessageHandler.java | 5 ++---
8 files changed, 67 insertions(+), 18 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
b/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
index 8779c0a6d..3aa477185 100644
--- a/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
+++ b/inlong-dataproxy/dataproxy-docker/dataproxy-docker.sh
@@ -23,7 +23,14 @@ local_ip=$(ifconfig $ETH_NAME | grep inet | grep -v inet6 |
grep -v "127.0.0.1"
# config
cd "${file_path}/"
common_conf_file=./conf/common.properties
-mq_conf_file=./conf/dataproxy-${MQ_TYPE}.conf
+if [ "${MQ_TYPE}" == "pulsar" ] || [ "${MQ_TYPE}" == "kafka" ]; then
+ mq_conf_file=./conf/dataproxy.conf
+elif [ "${MQ_TYPE}" == "tubemq" ]; then
+ mq_conf_file=./conf/dataproxy-${MQ_TYPE}.conf
+else
+ echo "MQ_TYPE must be one of pulsar/kafka/tubemq !"
+ exit 1
+fi
sed -i
"s/manager.hosts=.*$/manager.hosts=${MANAGER_OPENAPI_IP}:${MANAGER_OPENAPI_PORT}/g"
"${common_conf_file}"
sed -i "s/audit.enable=.*$/audit.enable=${AUDIT_ENABLE}/g"
"${common_conf_file}"
@@ -33,13 +40,7 @@ sed -i
"s/proxy.cluster.tag=.*$/proxy.cluster.tag=${CLUSTER_TAG}/g" "${common_co
sed -i "s/proxy.cluster.name=.*$/proxy.cluster.name=${CLUSTER_NAME}/g"
"${common_conf_file}"
sed -i
"s/proxy.cluster.inCharges=.*$/proxy.cluster.inCharges=${CLUSTER_IN_CHARGES}/g"
"${common_conf_file}"
-# start
-if [ "${MQ_TYPE}" = "pulsar" ]; then
- bash +x ./bin/dataproxy-start.sh pulsar
-fi
-if [ "${MQ_TYPE}" = "tubemq" ]; then
- bash +x ./bin/dataproxy-start.sh tubemq
-fi
+ bash +x ./bin/dataproxy-start.sh "${MQ_TYPE}"
sleep 3
# keep alive
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index fc99b3c05..3d110a2bd 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.flume.Context;
+import org.apache.flume.event.SimpleEvent;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
@@ -70,7 +71,6 @@ public class BatchPackManager {
/**
* addEvent
- *
* @param event
*/
public void addEvent(ProxyEvent event) {
@@ -135,6 +135,18 @@ public class BatchPackManager {
}
}
+ /**
+ * addSimpleEvent
+ * @param event
+ */
+ public void addSimpleEvent(SimpleEvent event) {
+ BatchPackProfile dispatchProfile =
SimpleBatchPackProfileV0.create(event);
+ this.dispatchQueue.acquire(dispatchProfile.getSize());
+ this.dispatchQueue.offer(dispatchProfile);
+ outCounter.addAndGet(dispatchProfile.getCount());
+ inCounter.incrementAndGet();
+ }
+
/**
* outputOvertimeData
*
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index d261e7dcb..bc4915524 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -23,6 +23,7 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.SimpleEvent;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.dataproxy.sink.common.SinkContext;
import org.apache.inlong.dataproxy.utils.BufferQueue;
@@ -149,6 +150,13 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable {
tx.commit();
return Status.READY;
}
+ // SimpleEvent, send as is
+ if (event instanceof SimpleEvent) {
+ SimpleEvent simpleEvent = (SimpleEvent) event;
+ this.dispatchManager.addSimpleEvent(simpleEvent);
+ tx.commit();
+ return Status.READY;
+ }
tx.commit();
this.context.addSendFailMetric();
return Status.READY;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
index f27eef62d..1c7d61704 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimpleBatchPackProfileV0.java
@@ -17,9 +17,12 @@
package org.apache.inlong.dataproxy.sink.mq;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import java.util.Map;
@@ -31,6 +34,7 @@ import java.util.Map;
public class SimpleBatchPackProfileV0 extends BatchPackProfile {
private Event simpleProfile;
+ private Map<String, String> properties;
/**
* Constructor
@@ -50,7 +54,7 @@ public class SimpleBatchPackProfileV0 extends
BatchPackProfile {
*/
public static SimpleBatchPackProfileV0 create(Event event) {
Map<String, String> headers = event.getHeaders();
- String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);;
+ String inlongGroupId = headers.get(AttributeConstants.GROUP_ID);
String inlongStreamId = headers.get(AttributeConstants.STREAM_ID);
String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
long msgTime =
NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME),
System.currentTimeMillis());
@@ -60,6 +64,11 @@ public class SimpleBatchPackProfileV0 extends
BatchPackProfile {
profile.setCount(1);
profile.setSize(event.getBody().length);
profile.simpleProfile = event;
+
+ String pkgVersion =
event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER);
+ if (StringUtils.isNotBlank(pkgVersion)) {
+ profile.properties = MessageUtils.getXfsAttrs(headers, pkgVersion);
+ }
return profile;
}
@@ -70,4 +79,12 @@ public class SimpleBatchPackProfileV0 extends
BatchPackProfile {
public Event getSimpleProfile() {
return simpleProfile;
}
+
+ /**
+ * get properties
+ * @return the properties
+ */
+ public Map<String, String> getProperties() {
+ return properties;
+ }
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index c0cde55d8..52e9c0816 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.mq.kafka;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.common.constant.Constants;
@@ -204,8 +205,11 @@ public class KafkaHandler implements MessageQueueHandler {
private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event,
IdTopicConfig idConfig,
String topic) throws Exception {
// headers
- Map<String, String> headers = event.getSimpleProfile().getHeaders();
- // compress
+ Map<String, String> headers = event.getProperties();
+ if (MapUtils.isEmpty(headers)) {
+ headers = event.getSimpleProfile().getHeaders();
+ }
+ // body
byte[] bodyBytes = event.getSimpleProfile().getBody();
// sendAsync
long sendTime = System.currentTimeMillis();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index f34e195ab..105b9502c 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.mq.pulsar;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
@@ -325,8 +326,11 @@ public class PulsarHandler implements MessageQueueHandler {
Producer<byte[]> producer,
String producerTopic) throws Exception {
// headers
- Map<String, String> headers = event.getSimpleProfile().getHeaders();
- // compress
+ Map<String, String> headers = event.getProperties();
+ if (MapUtils.isEmpty(headers)) {
+ headers = event.getSimpleProfile().getHeaders();
+ }
+ // body
byte[] bodyBytes = event.getSimpleProfile().getBody();
// sendAsync
long sendTime = System.currentTimeMillis();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 225c488c8..9bb599c41 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.sink.mq.tube;
+import org.apache.commons.collections.MapUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -239,7 +240,10 @@ public class TubeHandler implements MessageQueueHandler {
private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event,
IdTopicConfig idConfig,
String topic) throws Exception {
// headers
- Map<String, String> headers = event.getSimpleProfile().getHeaders();
+ Map<String, String> headers = event.getProperties();
+ if (MapUtils.isEmpty(headers)) {
+ headers = event.getSimpleProfile().getHeaders();
+ }
// compress
byte[] bodyBytes = event.getSimpleProfile().getBody();
// sendAsync
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 920268af1..3ab666e77 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.event.EventBuilder;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -56,7 +57,6 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
-import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -556,8 +556,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
strBuff.delete(0, strBuff.length());
}
final byte[] data = inLongMsg.buildArray();
- Event event = new ProxyEvent(groupId, streamIdEntry.getKey(),
data,
- Long.parseLong(strDataTime), strRemoteIP);
+ Event event = EventBuilder.withBody(data, headers);
event.getHeaders().putAll(headers);
inLongMsg.reset();
Pair<Boolean, String> evenProcType =