This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 59c83c226 [INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and
optimize sink code and config file (#6891)
59c83c226 is described below
commit 59c83c226a5c9cfe81282b5ddb037a29282a7a7e
Author: woofyzhao <[email protected]>
AuthorDate: Thu Dec 15 10:03:39 2022 +0800
[INLONG-6890][DataProxy] Fix tubemq MQ_TYPE enum and optimize sink code and
config file (#6891)
---
inlong-dataproxy/conf/dataproxy-tubemq.conf | 30 ++++++++++------
inlong-dataproxy/conf/dataproxy.conf | 30 ++++++++--------
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 42 +++++++++++-----------
.../main/resources/mappers/ClusterSetMapper.xml | 2 +-
.../pojo/cluster/tubemq/TubeClusterDTO.java | 4 +++
.../service/cluster/TubeClusterOperator.java | 1 +
6 files changed, 63 insertions(+), 46 deletions(-)
diff --git a/inlong-dataproxy/conf/dataproxy-tubemq.conf
b/inlong-dataproxy/conf/dataproxy-tubemq.conf
index f9b5e83d3..67f2e7fcf 100644
--- a/inlong-dataproxy/conf/dataproxy-tubemq.conf
+++ b/inlong-dataproxy/conf/dataproxy-tubemq.conf
@@ -152,31 +152,41 @@ agent1.channels.ch-msg10.fsyncPerTransaction = false
agent1.channels.ch-msg10.fsyncInterval = 5
agent1.sinks.meta-sink-msg1.channel = ch-msg1
-agent1.sinks.meta-sink-msg1.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg1.maxThreads = 1
agent1.sinks.meta-sink-msg2.channel = ch-msg2
-agent1.sinks.meta-sink-msg2.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg2.maxThreads = 1
agent1.sinks.meta-sink-msg3.channel = ch-msg3
-agent1.sinks.meta-sink-msg3.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg3.maxThreads = 1
agent1.sinks.meta-sink-msg5.channel = ch-msg5
-agent1.sinks.meta-sink-msg5.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg5.maxThreads = 1
agent1.sinks.meta-sink-msg6.channel = ch-msg6
-agent1.sinks.meta-sink-msg6.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg6.maxThreads = 1
agent1.sinks.meta-sink-msg7.channel = ch-msg7
-agent1.sinks.meta-sink-msg7.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg7.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg7.maxThreads = 1
agent1.sinks.meta-sink-msg8.channel = ch-msg8
-agent1.sinks.meta-sink-msg8.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg8.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg8.maxThreads = 1
agent1.sinks.meta-sink-msg9.channel = ch-msg9
-agent1.sinks.meta-sink-msg9.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg9.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg9.maxThreads = 1
agent1.sinks.meta-sink-msg10.channel = ch-msg10
-agent1.sinks.meta-sink-msg10.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-msg10.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-msg10.maxThreads = 1
agent1.sinks.meta-sink-back.channel = ch-back
-agent1.sinks.meta-sink-back.type =org.apache.inlong.dataproxy.sink.TubeSink
+agent1.sinks.meta-sink-back.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.meta-sink-back.maxThreads = 1
diff --git a/inlong-dataproxy/conf/dataproxy.conf
b/inlong-dataproxy/conf/dataproxy.conf
index 628d71ec4..d815193e3 100644
--- a/inlong-dataproxy/conf/dataproxy.conf
+++ b/inlong-dataproxy/conf/dataproxy.conf
@@ -115,23 +115,23 @@ agent1.channels.ch-msg6.dataDirs =
./data/file/ch-msg6/data
agent1.channels.ch-msg6.fsyncPerTransaction = false
agent1.channels.ch-msg6.fsyncInterval = 10
-agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
-agent1.sinks.pulsar-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg1.channel = ch-msg1
+agent1.sinks.mq-sink-msg1.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg1.maxThreads = 1
-agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
-agent1.sinks.pulsar-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg2.channel = ch-msg2
+agent1.sinks.mq-sink-msg2.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg2.maxThreads = 1
# For order message
-agent1.sinks.pulsar-sink-msg3.channel = ch-msg3
-agent1.sinks.pulsar-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg3.channel = ch-msg3
+agent1.sinks.mq-sink-msg3.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg3.maxThreads = 1
-agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
-agent1.sinks.pulsar-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg5.channel = ch-msg5
+agent1.sinks.mq-sink-msg5.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg5.maxThreads = 1
-agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
-agent1.sinks.pulsar-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
-agent1.sinks.pulsar-sink-msg1.maxThreads=1
+agent1.sinks.mq-sink-msg6.channel = ch-msg6
+agent1.sinks.mq-sink-msg6.type =
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.mq-sink-msg6.maxThreads = 1
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 1675e5ae5..8d52abcaf 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -17,12 +17,13 @@
package org.apache.inlong.dataproxy.sink.mq.tube;
-import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
+import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
@@ -49,6 +50,7 @@ public class TubeHandler implements MessageQueueHandler {
public static final Logger LOG =
LoggerFactory.getLogger(TubeHandler.class);
private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
+ public static final String KEY_NAMESPACE = "namespace";
private CacheClusterConfig config;
private MessageQueueZoneSinkContext sinkContext;
@@ -151,6 +153,16 @@ public class TubeHandler implements MessageQueueHandler {
LOG.info("tube handler stopped");
}
+ private String getTubeTopic(IdTopicConfig idConfig) {
+ // consider first using group mq resource as tube topic, for example,
group id
+ String topic = idConfig.getParams().get(KEY_NAMESPACE);
+ if (StringUtils.isBlank(topic)) {
+ // use whatever user specifies in stream mq resource
+ topic = idConfig.getTopicName();
+ }
+ return topic;
+ }
+
/**
* send
*/
@@ -163,7 +175,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.getDispatchQueue().release(event.getSize());
return false;
}
- String topic = idConfig.getTopicName();
+ String topic = getTubeTopic(idConfig);
if (topic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false,
0);
sinkContext.getDispatchQueue().release(event.getSize());
@@ -171,16 +183,17 @@ public class TubeHandler implements MessageQueueHandler {
}
// metric
sinkContext.addSendMetric(event, topic);
- // publish
- if (!this.topicSet.contains(topic)) {
- this.producer.publish(topic);
- this.topicSet.add(topic);
- }
// create producer failed
if (producer == null) {
+ LOG.error("producer is null");
sinkContext.processSendFail(event, topic, 0);
return false;
}
+ // publish
+ if (!this.topicSet.contains(topic)) {
+ this.producer.publish(topic);
+ this.topicSet.add(topic);
+ }
// send
if (event instanceof SimpleBatchPackProfileV0) {
this.sendSimpleProfileV0((SimpleBatchPackProfileV0) event,
idConfig, topic);
@@ -239,19 +252,8 @@ public class TubeHandler implements MessageQueueHandler {
*/
private void sendSimpleProfileV0(SimpleBatchPackProfileV0 event,
IdTopicConfig idConfig,
String topic) throws Exception {
- // headers
- Map<String, String> headers = event.getProperties();
- if (MapUtils.isEmpty(headers)) {
- headers = event.getSimpleProfile().getHeaders();
- }
- // compress
- byte[] bodyBytes = event.getSimpleProfile().getBody();
- // sendAsync
- Message message = new Message(topic, bodyBytes);
- // add headers
- headers.forEach((key, value) -> {
- message.setAttrKeyVal(key, value);
- });
+ // build message
+ Message message = TubeUtils.buildMessage(topic,
event.getSimpleProfile());
// callback
long sendTime = System.currentTimeMillis();
MessageSentCallback callback = new MessageSentCallback() {
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 1e93738db..05abcff7f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -33,7 +33,7 @@
ext_tag,
ext_params
from inlong_cluster
- where type in ('PULSAR', 'KAFKA', 'TUBE')
+ where type in ('PULSAR', 'KAFKA', 'TUBEMQ')
and is_deleted = '0'
</select>
<select id="selectInlongGroupId"
resultType="org.apache.inlong.manager.pojo.dataproxy.InlongGroupId">
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
index 971b9f1d0..57d5a49bf 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.cluster.tubemq;
+import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
@@ -47,6 +48,9 @@ public class TubeClusterDTO {
@Builder.Default
private String messageQueueHandler =
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler";
+ @JsonProperty("master-host-port-list")
+ private String masterIpPortList;
+
/**
* Get the dto instance from the JSON string.
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 8f72e1f0a..cb24bd741 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -62,6 +62,7 @@ public class TubeClusterOperator extends
AbstractClusterOperator {
CommonBeanUtils.copyProperties(tubeRequest, targetEntity, true);
try {
TubeClusterDTO dto = objectMapper.convertValue(tubeRequest,
TubeClusterDTO.class);
+ dto.setMasterIpPortList(request.getUrl());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for tubemq cluster");
} catch (Exception e) {