This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d2c510ee5 [INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as
cache cluster (#6803)
d2c510ee5 is described below
commit d2c510ee507a85352a07b69a6c78a1709852fd84
Author: woofyzhao <[email protected]>
AuthorDate: Fri Dec 9 16:59:34 2022 +0800
[INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as cache cluster
(#6803)
---
.../apache/inlong/common/constant/Constants.java | 3 +++
inlong-dataproxy/bin/dataproxy-start.sh | 7 ++++++-
.../conf/{dataproxy-pulsar.conf => dataproxy.conf} | 0
.../inlong/dataproxy/sink/common/SinkContext.java | 1 +
.../dataproxy/sink/mq/MessageQueueZoneProducer.java | 4 +---
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 21 +++++++++++++++++++--
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 2 ++
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 1 +
.../dataproxy/source/ServerMessageHandler.java | 2 +-
.../inlong/manager/pojo/cluster/ClusterRequest.java | 4 ++--
.../manager/pojo/cluster/kafka/KafkaClusterDTO.java | 9 +++++++++
.../pojo/cluster/pulsar/PulsarClusterDTO.java | 5 +++++
.../manager/pojo/cluster/tubemq/TubeClusterDTO.java | 3 +++
.../service/cluster/InlongClusterServiceImpl.java | 18 +++++++++++++++++-
.../service/cluster/KafkaClusterOperator.java | 1 +
.../service/cluster/PulsarClusterOperator.java | 1 +
.../queue/kafka/KafkaResourceOperators.java | 9 ++++++++-
17 files changed, 80 insertions(+), 11 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
index 0c2714ad0..983ac3988 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
@@ -26,4 +26,7 @@ public class Constants {
public static final int RESULT_FAIL = 1;
+ // default kafka topic is {groupId}.{streamId}
+ public static final String DEFAULT_KAFKA_TOPIC_FORMAT = "%s.%s";
+
}
diff --git a/inlong-dataproxy/bin/dataproxy-start.sh
b/inlong-dataproxy/bin/dataproxy-start.sh
index a0de3894c..9bc2cfc30 100755
--- a/inlong-dataproxy/bin/dataproxy-start.sh
+++ b/inlong-dataproxy/bin/dataproxy-start.sh
@@ -47,6 +47,10 @@ if [ -n "$1" ]; then
fi
CONFIG_FILE="dataproxy-${MQ_TYPE}.conf"
+if [ "${MQ_TYPE}" == "pulsar" ] || [ "${MQ_TYPE}" == "kafka" ]; then
+ CONFIG_FILE="dataproxy.conf"
+fi
+
CONFIG_FILE_WITH_COFING_PATH="conf/${CONFIG_FILE}"
CONFIG_FILE_WITH_PATH="${basedir}/${CONFIG_FILE}"
@@ -54,4 +58,5 @@ if [ -f "$CONFIG_FILE_WITH_PATH" ]; then
nohup bash +x bin/dataproxy-ng agent --conf conf/ -f
"${CONFIG_FILE_WITH_COFING_PATH}" -n agent1 --no-reload-conf > /dev/null 2>&1 &
else
error "${CONFIG_FILE_WITH_PATH} is not exist! start failed!" 1
-fi
\ No newline at end of file
+fi
+
diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf
b/inlong-dataproxy/conf/dataproxy.conf
similarity index 100%
rename from inlong-dataproxy/conf/dataproxy-pulsar.conf
rename to inlong-dataproxy/conf/dataproxy.conf
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 11298296e..a28493bad 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -221,6 +221,7 @@ public class SinkContext {
public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig
config) {
String strHandlerClass =
config.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER,
PulsarHandler.class.getName());
+ LOG.info("mq handler class = {}", strHandlerClass);
try {
Class<?> handlerClass = ClassUtils.getClass(strHandlerClass);
Object handlerObject =
handlerClass.getDeclaredConstructor().newInstance();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 5c221f892..e8bf16a8e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -108,9 +108,7 @@ public class MessageQueueZoneProducer {
public void reload() {
try {
// stop deleted cluster
- deletingClusterList.forEach(item -> {
- item.stop();
- });
+ deletingClusterList.forEach(MessageQueueClusterProducer::stop);
deletingClusterList.clear();
// update cluster list
List<CacheClusterConfig> configList =
this.context.getCacheHolder().getConfigList();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 7d030393d..c0cde55d8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -17,7 +17,9 @@
package org.apache.inlong.dataproxy.sink.mq.kafka;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
+import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
@@ -45,6 +47,7 @@ import java.util.Properties;
public class KafkaHandler implements MessageQueueHandler {
public static final Logger LOG =
LoggerFactory.getLogger(KafkaHandler.class);
+ public static final String KEY_NAMESPACE = "namespace";
private CacheClusterConfig config;
private MessageQueueZoneSinkContext sinkContext;
@@ -92,6 +95,7 @@ public class KafkaHandler implements MessageQueueHandler {
public void stop() {
// kafka producer
this.producer.close();
+ LOG.info("kafka handler stopped");
}
/**
@@ -109,12 +113,14 @@ public class KafkaHandler implements MessageQueueHandler {
sinkContext.getDispatchQueue().release(event.getSize());
return false;
}
- String topic = idConfig.getTopicName();
- if (topic == null) {
+ String baseTopic = idConfig.getTopicName();
+ if (baseTopic == null) {
sinkContext.addSendResultMetric(event, event.getUid(), false,
0);
sinkContext.getDispatchQueue().release(event.getSize());
return false;
}
+ String topic = getProducerTopic(baseTopic, idConfig);
+
// metric
sinkContext.addSendMetric(event, topic);
// create producer failed
@@ -138,6 +144,17 @@ public class KafkaHandler implements MessageQueueHandler {
}
}
+ /**
+ * getProducerTopic
+ */
+ private String getProducerTopic(String baseTopic, IdTopicConfig config) {
+ String namespace = config.getParams().get(KEY_NAMESPACE);
+ if (StringUtils.isNotEmpty(namespace)) {
+ return String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
namespace, baseTopic);
+ }
+ return baseTopic;
+ }
+
/**
* sendProfileV1
*/
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 49eccce4f..f34e195ab 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -150,6 +150,7 @@ public class PulsarHandler implements MessageQueueHandler {
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
+ LOG.info("pulsar handler started");
}
/**
@@ -169,6 +170,7 @@ public class PulsarHandler implements MessageQueueHandler {
} catch (PulsarClientException e) {
LOG.error(e.getMessage(), e);
}
+ LOG.info("pulsar handler stopped");
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 1b1471dbb..225c488c8 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -147,6 +147,7 @@ public class TubeHandler implements MessageQueueHandler {
LOG.error(e.getMessage(), e);
}
}
+ LOG.info("tube handler stopped");
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 31c0b4ee8..920268af1 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -437,7 +437,7 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
} else {
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr());
- logger.debug("Topic for message is null , inlongGroupId =
{}, inlongStreamId = {}",
+ logger.error("Topic for message is null , inlongGroupId =
{}, inlongStreamId = {}",
groupId, streamId);
return false;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
index 37cf3378f..90f07a315 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
@@ -47,7 +47,7 @@ public abstract class ClusterRequest {
private String name;
@NotBlank(message = "cluster type cannot be blank")
- @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR,
DATAPROXY, etc.")
+ @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA,
DATAPROXY, etc.")
private String type;
@ApiModelProperty(value = "Cluster url")
@@ -58,7 +58,7 @@ public abstract class ClusterRequest {
private String clusterTags;
@ApiModelProperty(value = "Extension tag")
- private String extTag;
+ private String extTag = "default=true";
@ApiModelProperty(value = "Cluster token")
private String token;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
index 620385a1d..9f5e25ca4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.pojo.cluster.kafka;
+import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -33,9 +35,16 @@ import javax.validation.constraints.NotNull;
@Data
@Builder
@NoArgsConstructor
+@AllArgsConstructor
@ApiModel("Kafka cluster info")
public class KafkaClusterDTO {
+ @Builder.Default
+ private String messageQueueHandler =
"org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
+
+ @JsonProperty("bootstrap.servers")
+ private String bootstrapServers;
+
/**
* Get the dto instance from the request
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index d1c93a629..d80d3da2b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -46,6 +46,11 @@ public class PulsarClusterDTO {
@Builder.Default
private String tenant = "public";
+ @Builder.Default
+ private String messageQueueHandler =
"org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler";
+
+ private String serviceUrl;
+
/**
* Get the dto instance from the request
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
index 811f850df..3006e8302 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java
@@ -45,6 +45,9 @@ public class TubeClusterDTO {
@ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080", notes =
"TubeMQ master RPC URL is the 'url' field of the cluster")
private String masterWebUrl;
+ @Builder.Default
+ private String messageQueueHandler =
"org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler";
+
/**
* Get the dto instance from the JSON string.
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 5cafede6f..aa6aeb9e7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -843,13 +844,28 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
topicConfig.setInlongGroupId(groupId);
topicConfig.setTopic(mqResource);
topicList.add(topicConfig);
+ } else if (MQType.KAFKA.equals(mqType)) {
+ List<InlongStreamBriefInfo> streamList =
streamMapper.selectBriefList(groupId);
+ for (InlongStreamBriefInfo streamInfo : streamList) {
+ String streamId = streamInfo.getInlongStreamId();
+ String topic = streamInfo.getMqResource();
+ if (topic.equals(streamId)) {
+ // the default mq resource (stream id) is not
sufficient to discriminate different kafka topics
+ topic =
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ mqResource, streamInfo.getMqResource());
+ }
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId + "/" + streamId);
+ topicConfig.setTopic(topic);
+ topicList.add(topicConfig);
+ }
}
}
// get mq cluster info
LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}",
clusterTagList);
List<MQClusterInfo> mqSet = new ArrayList<>();
- List<String> typeList = Arrays.asList(ClusterType.TUBEMQ,
ClusterType.PULSAR);
+ List<String> typeList = Arrays.asList(ClusterType.TUBEMQ,
ClusterType.PULSAR, ClusterType.KAFKA);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.typeList(typeList)
.clusterTagList(clusterTagList)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 4578f9e39..8c4eb37b7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -78,6 +78,7 @@ public class KafkaClusterOperator extends
AbstractClusterOperator {
CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
try {
KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
+ dto.setBootstrapServers(kafkaRequest.getUrl());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for kafka cluster");
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index c9ef2d562..3f7770d40 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -78,6 +78,7 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true);
try {
PulsarClusterDTO dto =
PulsarClusterDTO.getFromRequest(pulsarRequest);
+ dto.setServiceUrl(request.getUrl());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for pulsar cluster");
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 058694a05..21b0c5bdc 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -107,7 +108,13 @@ public class KafkaResourceOperators implements
QueueResourceOperator {
try {
InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
// create kafka topic
- this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
+ String topicName = streamInfo.getMqResource();
+ if (topicName.equals(streamId)) {
+ // the default mq resource (stream id) is not sufficient to
discriminate different kafka topics
+ topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ inlongKafkaInfo.getMqResource(),
streamInfo.getMqResource());
+ }
+ this.createKafkaTopic(inlongKafkaInfo, topicName);
} catch (Exception e) {
String msg = String.format("failed to create kafka topic for
groupId=%s, streamId=%s", groupId, streamId);
log.error(msg, e);