This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 59c83c226 [INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and 
optimize sink code and config file (#6891)
59c83c226 is described below

commit 59c83c226a5c9cfe81282b5ddb037a29282a7a7e
Author: woofyzhao <[email protected]>
AuthorDate: Thu Dec 15 10:03:39 2022 +0800

    [INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and optimize sink code and 
config file (#6891)
---
 inlong-dataproxy/conf/dataproxy-tubemq.conf        | 30 ++++++++++------
 inlong-dataproxy/conf/dataproxy.conf               | 30 ++++++++--------
 .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 42 +++++++++++-----------
 .../main/resources/mappers/ClusterSetMapper.xml    |  2 +-
 .../pojo/cluster/tubemq/TubeClusterDTO.java        |  4 +++
 .../service/cluster/TubeClusterOperator.java       |  1 +
 6 files changed, 63 insertions(+), 46 deletions(-)

diff --git a/inlong-dataproxy/conf/dataproxy-tubemq.conf 
b/inlong-dataproxy/conf/dataproxy-tubemq.conf
index f9b5e83d3..67f2e7fcf 100644
--- a/inlong-dataproxy/conf/dataproxy-tubemq.conf
+++ b/inlong-dataproxy/conf/dataproxy-tubemq.conf
@@ -152,31 +152,41 @@ agent1.channels.ch-msg10.fsyncPerTransaction = false
 agent1.channels.ch-msg10.fsyncInterval = 5
 
 agent1.sinks.meta-sink-msg1.channel = ch-msg1
-agent1.sinks.meta-sink-msg1.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg1.maxThreads = 1
 
 agent1.sinks.meta-sink-msg2.channel = ch-msg2
-agent1.sinks.meta-sink-msg2.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg2.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg2.maxThreads = 1
 
 agent1.sinks.meta-sink-msg3.channel = ch-msg3
-agent1.sinks.meta-sink-msg3.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg3.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg3.maxThreads = 1
 
 agent1.sinks.meta-sink-msg5.channel = ch-msg5
-agent1.sinks.meta-sink-msg5.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg5.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg5.maxThreads = 1
 
 agent1.sinks.meta-sink-msg6.channel = ch-msg6
-agent1.sinks.meta-sink-msg6.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg6.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg6.maxThreads = 1
 
 agent1.sinks.meta-sink-msg7.channel = ch-msg7
-agent1.sinks.meta-sink-msg7.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg7.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg7.maxThreads = 1
 
 agent1.sinks.meta-sink-msg8.channel = ch-msg8
-agent1.sinks.meta-sink-msg8.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg8.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg8.maxThreads = 1
 
 agent1.sinks.meta-sink-msg9.channel = ch-msg9
-agent1.sinks.meta-sink-msg9.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg9.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg9.maxThreads = 1
 
 agent1.sinks.meta-sink-msg10.channel = ch-msg10
-agent1.sinks.meta-sink-msg10.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg10.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg10.maxThreads = 1
 
 agent1.sinks.meta-sink-back.channel = ch-back
-agent1.sinks.meta-sink-back.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-back.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-back.maxThreads = 1
diff --git a/inlong-dataproxy/conf/dataproxy.conf 
b/inlong-dataproxy/conf/dataproxy.conf
index 628d71ec4..d815193e3 100644
--- a/inlong-dataproxy/conf/dataproxy.conf
+++ b/inlong-dataproxy/conf/dataproxy.conf
@@ -115,23 +115,23 @@ agent1.channels.ch-msg6.dataDirs = 
./data/file/ch-msg6/data
 agent1.channels.ch-msg6.fsyncPerTransaction = false
 agent1.channels.ch-msg6.fsyncInterval = 10
 
-agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
-agent1.sinks.pulsar-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg1.channel = ch-msg1
+agent1.sinks.mq-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg1.maxThreads = 1
 
-agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
-agent1.sinks.pulsar-sink-msg2.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg2.channel = ch-msg2
+agent1.sinks.mq-sink-msg2.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg2.maxThreads = 1
 
 # For order message
-agent1.sinks.pulsar-sink-msg3.channel = ch-msg3
-agent1.sinks.pulsar-sink-msg3.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg3.channel = ch-msg3
+agent1.sinks.mq-sink-msg3.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg3.maxThreads = 1
 
-agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
-agent1.sinks.pulsar-sink-msg5.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg5.channel = ch-msg5
+agent1.sinks.mq-sink-msg5.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg5.maxThreads = 1
 
-agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
-agent1.sinks.pulsar-sink-msg6.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg6.channel = ch-msg6
+agent1.sinks.mq-sink-msg6.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg6.maxThreads = 1
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 1675e5ae5..8d52abcaf 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.dataproxy.sink.mq.tube;
 
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.sink.common.EventHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
 import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
 import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
@@ -49,6 +50,7 @@ public class TubeHandler implements MessageQueueHandler {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(TubeHandler.class);
     private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
+    public static final String KEY_NAMESPACE = "namespace";
 
     private CacheClusterConfig config;
     private MessageQueueZoneSinkContext sinkContext;
@@ -151,6 +153,16 @@ public class TubeHandler implements MessageQueueHandler {
         LOG.info("tube handler stopped");
     }
 
+    private String getTubeTopic(IdTopicConfig idConfig) {
+        // consider first using group mq resource as tube topic, for example, 
group id
+        String topic = idConfig.getParams().get(KEY_NAMESPACE);
+        if (StringUtils.isBlank(topic)) {
+            // use whatever user specifies in stream mq resource
+            topic = idConfig.getTopicName();
+        }
+        return topic;
+    }
+
     /**
      * send
      */
@@ -163,7 +175,7 @@ public class TubeHandler implements MessageQueueHandler {
                 sinkContext.getDispatchQueue().release(event.getSize());
                 return false;
             }
-            String topic = idConfig.getTopicName();
+            String topic = getTubeTopic(idConfig);
             if (topic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 
0);
                 sinkContext.getDispatchQueue().release(event.getSize());
@@ -171,16 +183,17 @@ public class TubeHandler implements MessageQueueHandler {
             }
             // metric
             sinkContext.addSendMetric(event, topic);
-            // publish
-            if (!this.topicSet.contains(topic)) {
-                this.producer.publish(topic);
-                this.topicSet.add(topic);
-            }
             // create producer failed
             if (producer == null) {
+                LOG.error("producer is null");
                 sinkContext.processSendFail(event, topic, 0);
                 return false;
             }
+            // publish
+            if (!this.topicSet.contains(topic)) {
+                this.producer.publish(topic);
+                this.topicSet.add(topic);
+            }
             // send
             if (event instanceof SimpleBatchPackProfileV0) {
                 this.sendSimpleProfileV0((SimpleBatchPackProfileV0) event, 
idConfig, topic);
@@ -239,19 +252,8 @@ public class TubeHandler implements MessageQueueHandler {
      */
     private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event, 
IdTopicConfig idConfig,
             String topic) throws Exception {
-        // headers
-        Map<String, String> headers = event.getProperties();
-        if (MapUtils.isEmpty(headers)) {
-            headers = event.getSimpleProfile().getHeaders();
-        }
-        // compress
-        byte[] bodyBytes = event.getSimpleProfile().getBody();
-        // sendAsync
-        Message message = new Message(topic, bodyBytes);
-        // add headers
-        headers.forEach((key, value) -> {
-            message.setAttrKeyVal(key, value);
-        });
+        // build message
+        Message message = TubeUtils.buildMessage(topic, 
event.getSimpleProfile());
         // callback
         long sendTime = System.currentTimeMillis();
         MessageSentCallback callback = new MessageSentCallback() {
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml 
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 1e93738db..05abcff7f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -33,7 +33,7 @@
                ext_tag,
                ext_params
         from inlong_cluster
-        where type in ('PULSAR', 'KAFKA', 'TUBE')
+        where type in ('PULSAR', 'KAFKA', 'TUBEMQ')
           and is_deleted = '0'
     </select>
     <select id="selectInlongGroupId" 
resultType="org.apache.inlong.manager.pojo.dataproxy.InlongGroupId">
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
index 971b9f1d0..57d5a49bf 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.cluster.tubemq;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -47,6 +48,9 @@ public class TubeClusterDTO {
     @Builder.Default
     private String messageQueueHandler = 
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler";
 
+    @JsonProperty("master-host-port-list")
+    private String masterIpPortList;
+
     /**
      * Get the dto instance from the JSON string.
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 8f72e1f0a..cb24bd741 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -62,6 +62,7 @@ public class TubeClusterOperator extends 
AbstractClusterOperator {
         CommonBeanUtils.copyProperties(tubeRequest, targetEntity, true);
         try {
             TubeClusterDTO dto = objectMapper.convertValue(tubeRequest, 
TubeClusterDTO.class);
+            dto.setMasterIpPortList(request.getUrl());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for tubemq cluster");
         } catch (Exception e) {

Reply via email to