This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 963e7ea7a201803a7b5141c987824de267047132 Author: woofyzhao <[email protected]> AuthorDate: Wed Jun 8 21:08:03 2022 +0800 [INLONG-4598][Manager] Fix the Pulsar topic not match error (#4599) --- .../inlong/manager/common/pojo/source/pulsar/PulsarSource.java | 2 +- .../inlong/manager/service/sort/CreateSortConfigListenerV2.java | 3 +++ .../apache/inlong/manager/service/sort/util/ExtractNodeUtils.java | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java index 57fdc4a05..7a839b844 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSource.java @@ -43,7 +43,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine; public class PulsarSource extends StreamSource { @ApiModelProperty("Pulsar tenant") - private String tenant = "default"; + private String tenant = "public"; @ApiModelProperty("Pulsar namespace") private String namespace; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java index b107998bf..5276fd5b8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java @@ -144,9 +144,12 @@ public class CreateSortConfigListenerV2 implements SortOperateListener { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; String adminUrl = pulsarCluster.getAdminUrl(); String serviceUrl = pulsarCluster.getUrl(); + String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT + : pulsarCluster.getTenant(); streamInfoList.forEach(streamInfo -> { PulsarSource pulsarSource = new PulsarSource(); String streamId = streamInfo.getInlongStreamId(); + pulsarSource.setTenant(tenant); pulsarSource.setSourceName(streamId); pulsarSource.setNamespace(groupInfo.getMqResource()); pulsarSource.setTopic(streamInfo.getMqResource()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java index 3f3c44011..406376798 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java @@ -231,7 +231,8 @@ public class ExtractNodeUtils { List<FieldInfo> fieldInfos = streamFields.stream() .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name)) .collect(Collectors.toList()); - String topic = pulsarSource.getTopic(); + String fullTopicName = + pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic(); Format format; DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType()); @@ -269,7 +270,7 @@ public class ExtractNodeUtils { fieldInfos, null, Maps.newHashMap(), - topic, + fullTopicName, adminUrl, serviceUrl, format,
