This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a2b288cd0d [INLONG-8159][Manager] Rename "tenant" in InlongPulsarInfo
to "pulsarTenant" (#8160)
a2b288cd0d is described below
commit a2b288cd0d0f01bfafcadfbb122caff0e6f13e14
Author: vernedeng <[email protected]>
AuthorDate: Mon Jun 5 22:14:24 2023 +0800
[INLONG-8159][Manager] Rename "tenant" in InlongPulsarInfo to
"pulsarTenant" (#8160)
---
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 8 ++++----
.../apache/inlong/manager/client/BaseExample.java | 2 +-
.../apache/inlong/manager/client/ut/BaseTest.java | 2 +-
.../pojo/cluster/pulsar/PulsarClusterDTO.java | 4 ++--
.../pojo/cluster/pulsar/PulsarClusterInfo.java | 2 +-
.../pojo/cluster/pulsar/PulsarClusterRequest.java | 2 +-
.../manager/pojo/group/pulsar/InlongPulsarDTO.java | 2 +-
.../manager/pojo/group/pulsar/InlongPulsarInfo.java | 2 +-
.../manager/pojo/queue/pulsar/PulsarTopicInfo.java | 2 +-
.../manager/pojo/sort/util/ExtractNodeUtils.java | 2 +-
.../manager/pojo/source/pulsar/PulsarSource.java | 2 +-
.../manager/pojo/source/pulsar/PulsarSourceDTO.java | 4 ++--
.../pojo/source/pulsar/PulsarSourceRequest.java | 2 +-
.../service/cluster/InlongClusterServiceImpl.java | 4 ++--
.../service/cluster/PulsarClusterOperator.java | 2 +-
.../service/consume/ConsumePulsarOperator.java | 4 ++--
.../manager/service/core/impl/AgentServiceImpl.java | 4 ++--
.../service/core/impl/SortSourceServiceImpl.java | 2 +-
.../service/group/InlongGroupOperator4Pulsar.java | 4 ++--
.../consume/apply/ApproveConsumeProcessListener.java | 6 +++---
.../resource/queue/pulsar/PulsarOperator.java | 6 +++---
.../queue/pulsar/PulsarResourceOperator.java | 20 ++++++++++----------
.../service/source/pulsar/PulsarSourceOperator.java | 6 +++---
.../manager/service/sort/SortServiceImplTest.java | 2 +-
inlong-manager/manager-web/sql/changes-1.8.0.sql | 4 ++++
25 files changed, 52 insertions(+), 48 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 1800c5550a..1b4d273d11 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -66,7 +66,7 @@ public class PulsarHandler implements MessageQueueHandler {
// log print count
private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
- public static final String KEY_TENANT = "tenant";
+ public static final String KEY_TENANT = "pulsarTenant";
public static final String KEY_NAMESPACE = "namespace";
public static final String KEY_SERVICE_URL = "serviceUrl";
@@ -92,7 +92,7 @@ public class PulsarHandler implements MessageQueueHandler {
private String clusterName;
private MessageQueueZoneSinkContext sinkContext;
- private String tenant;
+ private String pulsarTenant;
private String namespace;
private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
@@ -113,7 +113,7 @@ public class PulsarHandler implements MessageQueueHandler {
this.config = config;
this.clusterName = config.getClusterName();
this.sinkContext = sinkContext;
- this.tenant = config.getParams().get(KEY_TENANT);
+ this.pulsarTenant = config.getParams().get(KEY_TENANT);
this.namespace = config.getParams().get(KEY_NAMESPACE);
}
@@ -214,7 +214,7 @@ public class PulsarHandler implements MessageQueueHandler {
return false;
}
// topic
- String producerTopic = idConfig.getPulsarTopicName(tenant,
namespace);
+ String producerTopic = idConfig.getPulsarTopicName(pulsarTenant,
namespace);
if (producerTopic == null) {
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index bbe7ee601e..b6afd055e7 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -72,7 +72,7 @@ public class BaseExample {
pulsarInfo.setInCharges("admin");
// pulsar conf
- pulsarInfo.setTenant(tenant);
+ pulsarInfo.setPulsarTenant(tenant);
pulsarInfo.setMqResource(namespace);
// set enable zk, create resource, group mode, and cluster tag
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 0867270389..4d218918f5 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -104,7 +104,7 @@ public class BaseTest {
pulsarInfo.setInCharges(IN_CHARGES);
// pulsar conf
- pulsarInfo.setTenant(TENANT);
+ pulsarInfo.setPulsarTenant(TENANT);
pulsarInfo.setMqResource(NAMESPACE);
// set enable zk, create resource, group mode, and cluster tag
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index 9512d4f88d..1eeb17cbd5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -50,7 +50,7 @@ public class PulsarClusterDTO {
private String serviceUrl;
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- private String tenant;
+ private String pulsarTenant;
/**
* Saved to ext_params field, it is convenient for DataProxy to obtain.
@@ -65,7 +65,7 @@ public class PulsarClusterDTO {
return PulsarClusterDTO.builder()
.adminUrl(request.getAdminUrl())
.serviceUrl(request.getUrl())
- .tenant(request.getTenant())
+ .pulsarTenant(request.getPulsarTenant())
.build();
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
index 935db0f787..e67ed426b9 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
@@ -44,7 +44,7 @@ public class PulsarClusterInfo extends ClusterInfo {
private String adminUrl;
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- private String tenant;
+ private String pulsarTenant;
public PulsarClusterInfo() {
this.setType(ClusterType.PULSAR);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
index aaf7a4da4b..594eab56a3 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
@@ -45,7 +45,7 @@ public class PulsarClusterRequest extends ClusterRequest {
private String adminUrl;
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- private String tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ private String pulsarTenant = InlongConstants.DEFAULT_PULSAR_TENANT;
public PulsarClusterRequest() {
this.setType(ClusterType.PULSAR);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index 50a327bb76..312191f172 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -45,7 +45,7 @@ import javax.validation.constraints.NotNull;
public class InlongPulsarDTO extends BaseInlongGroup {
@ApiModelProperty(value = "Pulsar tenant")
- private String tenant;
+ private String pulsarTenant;
@ApiModelProperty(value = "Queue model, parallel: multiple partitions,
high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
index aa0fb7b2a9..1bd7780b6e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
@@ -41,7 +41,7 @@ import lombok.ToString;
public class InlongPulsarInfo extends InlongGroupInfo {
@ApiModelProperty(value = "Pulsar tenant")
- private String tenant;
+ private String pulsarTenant;
@ApiModelProperty(value = "Queue model, parallel: multiple partitions,
high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
index 29a8f4efbd..c6f74f985c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
@@ -31,7 +31,7 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class PulsarTopicInfo {
- private String tenant;
+ private String pulsarTenant;
private String namespace;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 5342e898c6..44ea1b4fb5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -226,7 +226,7 @@ public class ExtractNodeUtils {
public static PulsarExtractNode createExtractNode(PulsarSource
pulsarSource) {
List<FieldInfo> fieldInfos =
parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
String fullTopicName =
- pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() +
"/" + pulsarSource.getTopic();
+ pulsarSource.getPulsarTenant() + "/" +
pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
Format format = parsingFormat(pulsarSource.getSerializationType(),
pulsarSource.isWrapWithInlongMsg(),
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 05228f0df8..25d23f2e16 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -46,7 +46,7 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty("Pulsar tenant")
@Builder.Default
- private String tenant = "public";
+ private String pulsarTenant = "public";
@ApiModelProperty("Pulsar namespace")
private String namespace;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index b53ca295d0..70e58d5115 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -41,7 +41,7 @@ import java.util.Map;
public class PulsarSourceDTO {
@ApiModelProperty("Pulsar tenant")
- private String tenant;
+ private String pulsarTenant;
@ApiModelProperty("Pulsar namespace")
private String namespace;
@@ -85,7 +85,7 @@ public class PulsarSourceDTO {
return PulsarSourceDTO.builder()
.adminUrl(request.getAdminUrl())
.serviceUrl(request.getServiceUrl())
- .tenant(request.getTenant())
+ .pulsarTenant(request.getPulsarTenant())
.namespace(request.getNamespace())
.topic(request.getTopic())
.subscription(request.getSubscription())
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index fd44e5fee3..a7321f06e5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -40,7 +40,7 @@ import java.nio.charset.StandardCharsets;
public class PulsarSourceRequest extends SourceRequest {
@ApiModelProperty("Pulsar tenant")
- private String tenant = "default";
+ private String pulsarTenant = "default";
@ApiModelProperty("Pulsar namespace")
private String namespace;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 586ff40848..d964f7f489 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -1263,7 +1263,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
if (MQType.PULSAR.equals(mqType) ||
MQType.TDMQ_PULSAR.equals(mqType)) {
InlongPulsarDTO pulsarDTO =
InlongPulsarDTO.getFromJson(groupInfo.getExtParams());
// First get the tenant from the InlongGroup, and then get it
from the PulsarCluster.
- String tenant = pulsarDTO.getTenant();
+ String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
// If there are multiple Pulsar clusters, take the first
one.
// Note that the tenants in multiple Pulsar clusters must
be identical.
@@ -1274,7 +1274,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
continue;
}
PulsarClusterDTO cluster =
PulsarClusterDTO.getFromJson(pulsarClusters.get(0).getExtParams());
- tenant = cluster.getTenant();
+ tenant = cluster.getPulsarTenant();
}
List<InlongStreamBriefInfo> streamList =
streamMapper.selectBriefList(groupId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index ef11d9b3d3..ed2a273929 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -116,7 +116,7 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
Map<String, String> map = new HashMap<>();
map.put("serverUrl", pulsarClusterInfo.getUrl());
map.put("adminUrl", pulsarClusterInfo.getAdminUrl());
- map.put("defaultTenant", pulsarClusterInfo.getTenant());
+ map.put("defaultTenant", pulsarClusterInfo.getPulsarTenant());
return map;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index bb6a0266e0..e5d7cd9a17 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -119,12 +119,12 @@ public class ConsumePulsarOperator extends
AbstractConsumeOperator {
consumeInfo.setClusterInfos(clusterInfos);
// First get the tenant from the InlongGroup, and then get it from the
PulsarCluster.
- String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ String tenant = ((InlongPulsarInfo) groupInfo).getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
// If there are multiple Pulsar clusters, take the first one.
// Note that the tenants in multiple Pulsar clusters must be
identical.
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)
clusterInfos.get(0);
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
consumeInfo.setTopic(getFullPulsarTopic(groupInfo, tenant,
entity.getTopic()));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index ed873ff91f..ec9d6c8968 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -585,13 +585,13 @@ public class AgentServiceImpl implements AgentService {
if (MQType.PULSAR.equals(mqType) ||
MQType.TDMQ_PULSAR.equals(mqType)) {
// first get the tenant from the InlongGroup, and then get
it from the PulsarCluster.
InlongPulsarDTO pulsarDTO =
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
- String tenant = pulsarDTO.getTenant();
+ String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
// If there are multiple Pulsar clusters, take the
first one.
// Note that the tenants in multiple Pulsar clusters
must be identical.
PulsarClusterDTO pulsarCluster =
PulsarClusterDTO.getFromJson(
mqClusterList.get(0).getExtParams());
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
String topic =
String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 0d5fa5a130..924a3815a3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -81,7 +81,7 @@ public class SortSourceServiceImpl implements
SortSourceService {
}
};
private static final String KEY_AUTH = "authentication";
- private static final String KEY_TENANT = "tenant";
+ private static final String KEY_TENANT = "pulsarTenant";
private static final int RESPONSE_CODE_SUCCESS = 0;
private static final int RESPONSE_CODE_NO_UPDATE = 1;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index 996062c8da..24452cc3fb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -124,9 +124,9 @@ public class InlongGroupOperator4Pulsar extends
AbstractGroupOperator {
groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
// First get the tenant from the InlongGroup, and then get it from the
PulsarCluster.
- String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ String tenant = ((InlongPulsarInfo) groupInfo).getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
topicInfo.setTenant(tenant);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 2fb2d3a18b..1e6114bffa 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -133,12 +133,12 @@ public class ApproveConsumeProcessListener implements
ProcessEventListener {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
InlongPulsarDTO pulsarDTO =
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
- String tenant = pulsarDTO.getTenant();
+ String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
PulsarTopicInfo topicMessage = new PulsarTopicInfo();
- topicMessage.setTenant(tenant);
+ topicMessage.setPulsarTenant(tenant);
topicMessage.setNamespace(mqResource);
List<String> topics =
Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 899cf2a0f9..fe597f1e84 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -148,7 +148,7 @@ public class PulsarOperator {
*/
public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo
topicInfo) throws PulsarAdminException {
Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be
empty");
- String tenant = topicInfo.getTenant();
+ String tenant = topicInfo.getPulsarTenant();
String namespace = topicInfo.getNamespace();
String topicName = topicInfo.getTopicName();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
@@ -205,7 +205,7 @@ public class PulsarOperator {
public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo
topicInfo) throws PulsarAdminException {
Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be
empty");
- String tenant = topicInfo.getTenant();
+ String tenant = topicInfo.getPulsarTenant();
String namespace = topicInfo.getNamespace();
String topic = topicInfo.getTopicName();
String fullTopicName = tenant + "/" + namespace + "/" + topic;
@@ -254,7 +254,7 @@ public class PulsarOperator {
List<String> topicList) throws PulsarAdminException {
for (String topic : topicList) {
topicInfo.setTopicName(topic);
- String fullTopicName = topicInfo.getTenant() + "/" +
topicInfo.getNamespace() + "/" + topic;
+ String fullTopicName = topicInfo.getPulsarTenant() + "/" +
topicInfo.getNamespace() + "/" + topic;
this.createSubscription(pulsarAdmin, fullTopicName,
topicInfo.getQueueModule(), subscription);
}
LOGGER.info("success to create subscription={} for multiple
topics={}", subscription, topicList);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 2ef2ed7228..c6b6197ae3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -91,7 +91,7 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
}
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- String tenant = pulsarInfo.getTenant();
+ String tenant = pulsarInfo.getPulsarTenant();
// get pulsar cluster via the inlong cluster tag from the inlong group
List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
@@ -99,7 +99,7 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
// create pulsar tenant and namespace
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
// if the group was not successful, need create tenant and
namespace
@@ -219,13 +219,13 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo
pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getTenant();
+ String tenant = pulsarInfo.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
String namespace = pulsarInfo.getMqResource();
PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
- .tenant(tenant)
+ .pulsarTenant(tenant)
.namespace(namespace)
.topicName(topicName)
.queueModule(pulsarInfo.getQueueModule())
@@ -241,9 +241,9 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
private void createSubscription(InlongPulsarInfo pulsarInfo,
PulsarClusterInfo pulsarCluster, String topicName,
String streamId) throws Exception {
try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getTenant();
+ String tenant = pulsarInfo.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
String namespace = pulsarInfo.getMqResource();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
@@ -285,13 +285,13 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
private void deletePulsarTopic(InlongPulsarInfo pulsarInfo,
PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getTenant();
+ String tenant = pulsarInfo.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
String namespace = pulsarInfo.getMqResource();
PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
- .tenant(tenant)
+ .pulsarTenant(tenant)
.namespace(namespace)
.topicName(topicName)
.build();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 4c3af4955c..da3232a032 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -122,9 +122,9 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
String serviceUrl = pulsarCluster.getUrl();
// First get the tenant from the InlongGroup, and then get it from the
PulsarCluster.
- String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ String tenant = ((InlongPulsarInfo) groupInfo).getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getTenant();
+ tenant = pulsarCluster.getPulsarTenant();
}
Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
@@ -132,7 +132,7 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
PulsarSource pulsarSource = new PulsarSource();
String streamId = streamInfo.getInlongStreamId();
pulsarSource.setSourceName(streamId);
- pulsarSource.setTenant(tenant);
+ pulsarSource.setPulsarTenant(tenant);
pulsarSource.setNamespace(groupInfo.getMqResource());
pulsarSource.setTopic(streamInfo.getMqResource());
pulsarSource.setAdminUrl(adminUrl);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 25ba2c6100..7fad5b6677 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -355,7 +355,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
+ "&producer=true"
+ "&consumer=" + (isConsumable ? "true" : "false");
request.setExtTag(extTag);
- request.setExtParams("{\"tenant\":\"testTenant\","
+ request.setExtParams("{\"pulsarTenant\":\"testTenant\","
+
"\"authentication\":\"testAuth\",\"adminUrl\":\"testAdmin\"}");
clusterService.save(request, "test operator");
}
diff --git a/inlong-manager/manager-web/sql/changes-1.8.0.sql
b/inlong-manager/manager-web/sql/changes-1.8.0.sql
index 08b005b88e..c99a4fb2ad 100644
--- a/inlong-manager/manager-web/sql/changes-1.8.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.8.0.sql
@@ -75,3 +75,7 @@ ALTER TABLE tenant_user_role
UNIQUE (user_name, tenant, is_deleted);
CREATE INDEX index_tenant
ON tenant_user_role (tenant, is_deleted);
+
+-- To avoid the ambiguity, rename "tenant" in PulsarGroup & PulsarCluster to
"pulsarTenant"
+UPDATE inlong_group SET ext_params = replace(ext_params, '"tenant"',
'"pulsarTenant"');
+UPDATE inlong_cluster SET ext_params = replace(ext_params, '"tenant"',
'"pulsarTenant"');
\ No newline at end of file