This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5ccec72d1 [INLONG-5491][Manager] Fix the error of configuring the
Pulsar nonpartitioned topic (#5492)
5ccec72d1 is described below
commit 5ccec72d16648e9296fea7ee67321c818f0c62a5
Author: fuweng11 <[email protected]>
AuthorDate: Fri Aug 12 16:47:17 2022 +0800
[INLONG-5491][Manager] Fix the error of configuring the Pulsar
nonpartitioned topic (#5492)
---
.../manager/common/consts/InlongConstants.java | 4 ++
.../resource/queue/pulsar/PulsarOperator.java | 44 +++++++++++++++-------
.../queue/pulsar/PulsarResourceOperator.java | 5 ++-
3 files changed, 38 insertions(+), 15 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c2c584d75..28c2435d2 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -62,6 +62,10 @@ public class InlongConstants {
public static final String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
+ public static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
+
+ public static final String PULSAR_QUEUE_TYPE_PARALLEL = "PARALLEL";
+
/**
* Format of the Pulsar topic: "persistent://tenant/namespace/topic
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 98fc3f689..9bfe2605b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -19,11 +19,12 @@ package
org.apache.inlong.manager.service.resource.queue.pulsar;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -47,7 +48,6 @@ import java.util.Map;
public class PulsarOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
- private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
private static final int MAX_PARTITION = 100;
private static final int RETRY_TIMES = 3;
private static final int DELAY_SECONDS = 5;
@@ -146,13 +146,14 @@ public class PulsarOperator {
String topicFullName = tenant + "/" + namespace + "/" + topic;
// Topic will be returned if it exists, and created if it does not
exist
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule())))
{
LOGGER.warn("pulsar topic={} already exists in {}", topicFullName,
pulsarAdmin.getServiceUrl());
return;
}
try {
- if (PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
+ if
(InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
LOGGER.info("success to create topic={}, lookup result is {}",
topicFullName, res);
@@ -202,7 +203,8 @@ public class PulsarOperator {
String topicFullName = tenant + "/" + namespace + "/" + topic;
// Topic will be returned if it not exists
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule())))
{
LOGGER.warn("pulsar topic={} already delete", topicFullName);
return;
}
@@ -227,7 +229,8 @@ public class PulsarOperator {
String topicName = topicBean.getTenant() + "/" +
topicBean.getNamespace() + "/" + topicBean.getTopicName();
LOGGER.info("begin to create pulsar subscription={} for topic={}",
subscription, topicName);
try {
- boolean isExists = this.subscriptionIsExists(pulsarAdmin,
topicName, subscription);
+ boolean isExists = this.subscriptionIsExists(pulsarAdmin,
topicName, subscription,
+
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
if (isExists) {
LOGGER.warn("pulsar subscription={} already exists, skip to
create", subscription);
return;
@@ -275,7 +278,8 @@ public class PulsarOperator {
* @apiNote cannot compare whether the string contains, otherwise it may
be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be
mistakenly judged to exist
*/
- public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant,
String namespace, String topic) {
+ public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant,
String namespace, String topic,
+ boolean isPartitioned) {
if (StringUtils.isBlank(topic)) {
return true;
}
@@ -284,7 +288,11 @@ public class PulsarOperator {
List<String> topicList;
boolean topicExists = false;
try {
- topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant +
"/" + namespace);
+ if (isPartitioned) {
+ topicList =
pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ } else {
+ topicList = pulsarAdmin.topics().getList(tenant + "/" +
namespace);
+ }
for (String t : topicList) {
t = t.substring(t.lastIndexOf("/") + 1); // not contains /
if (topic.equals(t)) {
@@ -316,7 +324,8 @@ public class PulsarOperator {
return topicExists;
}
- private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String
topic, String subscription) {
+ private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String
topic, String subscription,
+ boolean isPartitioned) {
int count = 0;
while (++count <= RETRY_TIMES) {
try {
@@ -324,11 +333,20 @@ public class PulsarOperator {
Thread.sleep(DELAY_SECONDS);
// first lookup to load the topic, and then query whether the
subscription exists
- Map<String, String> topicMap =
pulsarAdmin.lookups().lookupPartitionedTopic(topic);
- if (topicMap.isEmpty()) {
- LOGGER.error("result of lookups topic={} is empty,
continue retry", topic);
- continue;
+ if (isPartitioned) {
+ Map<String, String> topicMap =
pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+ if (topicMap.isEmpty()) {
+ LOGGER.error("result of lookups topic={} is empty,
continue retry", topic);
+ continue;
+ }
+ } else {
+ String lookupTopic =
pulsarAdmin.lookups().lookupTopic(topic);
+ if (StringUtils.isBlank(lookupTopic)) {
+ LOGGER.error("result of lookups topic={} is empty,
continue retry", topic);
+ continue;
+ }
}
+
List<String> subscriptionList =
pulsarAdmin.topics().getSubscriptions(topic);
return subscriptionList.contains(subscription);
} catch (PulsarAdminException | InterruptedException e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index e4f2b46c4..1b179c597 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -21,9 +21,9 @@ import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -205,7 +205,8 @@ public class PulsarResourceOperator implements
QueueResourceOperator {
pulsarOperator.createTopic(pulsarAdmin, topicBean);
// 2. create a subscription for the pulsar topic
- boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant,
namespace, topicName);
+ boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant,
namespace, topicName,
+
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
if (!exist) {
String topicFullName = tenant + "/" + namespace + "/" +
topicName;
String serviceUrl = pulsarCluster.getAdminUrl();