This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2e70a12a2f [Manager] Fix sort standalone get kafka config error
(#10106)
2e70a12a2f is described below
commit 2e70a12a2f95e9c8eacdc6ce91afa6f4297e2df3
Author: castor <[email protected]>
AuthorDate: Sat May 4 14:25:39 2024 +0800
[Manager] Fix sort standalone get kafka config error (#10106)
Co-authored-by: castorqin <[email protected]>
---
.../service/node/kafka/KafkaDataNodeOperator.java | 14 ++++++++++++++
.../service/sink/kafka/KafkaSinkOperator.java | 22 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index ae91b2f394..4fbc740d36 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -40,6 +40,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
+import java.util.Map;
+
/**
* Kafka data node operator
*/
@@ -48,6 +50,9 @@ public class KafkaDataNodeOperator extends
AbstractDataNodeOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaDataNodeOperator.class);
+ private static final String bootstrapServers = "bootstrap.servers";
+ private static final String clientId = "client.id";
+
@Autowired
private ObjectMapper objectMapper;
@@ -79,6 +84,15 @@ public class KafkaDataNodeOperator extends
AbstractDataNodeOperator {
return kafkaDataNodeInfo;
}
+ @Override
+ public Map<String, String> parse2SinkParams(DataNodeInfo info) {
+ Map<String, String> params = super.parse2SinkParams(info);
+ KafkaDataNodeInfo kafkaDataNodeInfo = (KafkaDataNodeInfo) info;
+ params.put(bootstrapServers, kafkaDataNodeInfo.getBootstrapServers());
+ params.put(clientId, kafkaDataNodeInfo.getClientId());
+ return params;
+ }
+
@Override
protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 4357556732..d7fa197c9f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -30,6 +31,7 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.Map;
/**
* Kafka sink operator
@@ -46,6 +49,8 @@ public class KafkaSinkOperator extends AbstractSinkOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSinkOperator.class);
+ private static final String topic = "topic";
+
@Autowired
private ObjectMapper objectMapper;
@@ -75,6 +80,23 @@ public class KafkaSinkOperator extends AbstractSinkOperator {
}
}
+ @Override
+ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink,
List<String> fields,
+ DataNodeInfo dataNodeInfo) {
+
+ Map<String, String> params = super.parse2IdParams(streamSink, fields,
dataNodeInfo);
+
+ KafkaSinkDTO kafkaSinkDTO;
+ try {
+ kafkaSinkDTO = objectMapper.readValue(streamSink.getExtParams(),
KafkaSinkDTO.class);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("parse kafka sink dto error", e);
+ return params;
+ }
+ params.put(topic, kafkaSinkDTO.getTopicName());
+ return params;
+ }
+
@Override
public StreamSink getFromEntity(StreamSinkEntity entity) {
KafkaSink sink = new KafkaSink();