This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 9e7943edb [INLONG-6065][DataProxy] Delete the Pulsar client
synchronously when deleting a topic (#6066)
9e7943edb is described below
commit 9e7943edb35eda6d05471c054c3ac888cc56a95d
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Sep 30 10:27:54 2022 +0800
[INLONG-6065][DataProxy] Delete the Pulsar client synchronously when
deleting a topic (#6066)
Co-authored-by: healchow <[email protected]>
---
.../apache/inlong/dataproxy/sink/PulsarSink.java | 25 +++++++++++++++++-----
.../dataproxy/sink/pulsar/PulsarClientService.java | 14 ++++++++++++
2 files changed, 34 insertions(+), 5 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index cac638ce5..c2014965d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -246,21 +246,36 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
/**
* When topic.properties is re-enabled, the producer update is triggered
*/
- public void diffSetPublish(PulsarClientService pulsarClientService,
Set<String> originalSet, Set<String> endSet) {
+ public void diffSetPublish(PulsarClientService pulsarClientService,
+ Set<String> curTopicSet, Set<String>
newTopicSet) {
boolean changed = false;
- for (String s : endSet) {
- if (!originalSet.contains(s)) {
+ // create producers for new topics
+ for (String newTopic : newTopicSet) {
+ if (!curTopicSet.contains(newTopic)) {
changed = true;
try {
- pulsarClientService.initTopicProducer(s);
+ pulsarClientService.initTopicProducer(newTopic);
} catch (Exception e) {
logger.error("get producer failed: ", e);
}
}
}
+ // remove producers for deleted topics
+ for (String oldTopic : curTopicSet) {
+ if (!newTopicSet.contains(oldTopic)) {
+ changed = true;
+ try {
+ pulsarClientService.destroyProducerByTopic(oldTopic);
+ } catch (Exception e) {
+ logger.error("remove producer failed: ", e);
+ }
+ }
+ }
if (changed) {
- logger.info("topics.properties has changed, trigger diff publish
for {}", getName());
topicProperties = configManager.getTopicProperties();
+ logger.info("topics.properties has changed, trigger diff publish
for {},"
+ + " old topic set = {}, new topic set = {}, current
topicProperties = {}",
+ getName(), curTopicSet, newTopicSet, topicProperties);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 66b06e93d..dc0a87b07 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -323,6 +323,20 @@ public class PulsarClientService {
return initTopicProducer(topic, null, null);
}
+ public boolean destroyProducerByTopic(String topic) {
+ List<TopicProducerInfo> producerInfoList =
producerInfoMap.remove(topic);
+ if (producerInfoList == null || producerInfoList.isEmpty()) {
+ return true;
+ }
+ for (TopicProducerInfo producerInfo : producerInfoList) {
+ if (producerInfo != null) {
+ producerInfo.close();
+ logger.info("destroy producer for topic={}", topic);
+ }
+ }
+ return true;
+ }
+
private TopicProducerInfo getProducerInfo(int poolIndex, String topic,
String inlongGroupId,
String inlongStreamId) {
List<TopicProducerInfo> producerList = initTopicProducer(topic,
inlongGroupId, inlongStreamId);