This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab74bf549 [INLONG-7325][Manager] Format the topic name and data
separator for Kafka (#7326)
ab74bf549 is described below
commit ab74bf549dcf4e1837a38e28979875f769473c04
Author: haifxu <[email protected]>
AuthorDate: Tue Feb 7 15:54:34 2023 +0800
[INLONG-7325][Manager] Format the topic name and data separator for Kafka
(#7326)
---
.../resource/queue/kafka/KafkaResourceOperators.java | 9 ++++++++-
.../service/source/kafka/KafkaSourceOperator.java | 19 +++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 1f7180d3b..f8d43f423 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -134,7 +135,13 @@ public class KafkaResourceOperators implements
QueueResourceOperator {
log.info("begin to delete kafka resource for groupId={} streamId={}",
groupId, streamId);
try {
- this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
+ String topicName = streamInfo.getMqResource();
+ if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
+ // the default mq resource (stream id) is not sufficient to
discriminate different kafka topics
+ topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ groupInfo.getMqResource(), streamInfo.getMqResource());
+ }
+ this.deleteKafkaTopic(groupInfo, topicName);
log.info("success to delete kafka topic for groupId={},
streamId={}", groupId, streamId);
} catch (Exception e) {
String msg = String.format("failed to delete kafka topic for
groupId=%s, streamId=%s", groupId, streamId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 97cedf6a2..4338a588f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -113,6 +114,13 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
kafkaSource.setBootstrapServers(bootstrapServers);
kafkaSource.setTopic(streamInfo.getMqResource());
String serializationType =
DataTypeEnum.forType(streamInfo.getDataType()).getType();
+ String topicName = streamInfo.getMqResource();
+ if (StringUtils.isBlank(topicName) || topicName.equals(streamId)) {
+ // the default mq resource (stream id) is not sufficient to
discriminate different kafka topics
+ topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+ groupInfo.getMqResource(), streamInfo.getMqResource());
+ }
+ kafkaSource.setTopic(topicName);
kafkaSource.setSerializationType(serializationType);
kafkaSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
@@ -126,6 +134,17 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
}
}
+ // if the SerializationType is still null, set it to the CSV
+ if (StringUtils.isBlank(kafkaSource.getSerializationType())) {
+ kafkaSource.setSerializationType(DataTypeEnum.CSV.getType());
+ }
+ if
(DataTypeEnum.CSV.getType().equalsIgnoreCase(kafkaSource.getSerializationType()))
{
+ kafkaSource.setDataSeparator(streamInfo.getDataSeparator());
+ if (StringUtils.isBlank(kafkaSource.getDataSeparator())) {
+ kafkaSource.setDataSeparator(String.valueOf((int) ','));
+ }
+ }
+
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());