This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 15da36a0e [INLONG-7151][Manager] Fix failure to create node when init
sort (#7152)
15da36a0e is described below
commit 15da36a0ef58f1401bd558ee80d7814b890739fa
Author: haifxu <[email protected]>
AuthorDate: Thu Jan 5 09:49:52 2023 +0800
[INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
.../inlong/manager/service/sink/mysql/MySQLSinkOperator.java | 10 ++++++++++
.../manager/service/source/kafka/KafkaSourceOperator.java | 6 +++++-
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
package org.apache.inlong.manager.service.sink.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
}
MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ String dataNodeName = entity.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not
specified and data node is empty");
+ DataNodeInfo dataNodeInfo =
dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setJdbcUrl(dataNodeInfo.getUrl());
+ dto.setPassword(dataNodeInfo.getToken());
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
if (!Objects.equals(streamId, sourceInfo.getInlongStreamId()))
{
continue;
}
-
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+ if (StringUtils.isEmpty(kafkaSource.getSerializationType()) &&
StringUtils.isNotEmpty(
+ sourceInfo.getSerializationType())) {
+
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+ }
}
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());