This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 87e647fef [INLONG-6422][TubeMQ] Deleting the topic was unsuccessful
(#6445)
87e647fef is described below
commit 87e647fef6f35e54a33fe0ff7a422e76d34052eb
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Nov 7 19:48:46 2022 +0800
[INLONG-6422][TubeMQ] Deleting the topic was unsuccessful (#6445)
---
.../inlong/tubemq/server/broker/msgstore/MessageStoreManager.java | 7 +++++--
.../master/metamanage/metastore/dao/entity/TopicDeployEntity.java | 2 +-
.../master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java | 4 ++--
.../master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java | 6 +++---
4 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 1856b8252..14ff6c863 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -223,6 +223,7 @@ public class MessageStoreManager implements StoreService {
if (targetTopics.isEmpty()) {
return removedTopics;
}
+ logger.info("[Remove Topic] start remove topics : {}",
targetTopics);
for (String tmpTopic : targetTopics) {
ConcurrentHashMap<Integer, MessageStore> topicStores =
dataStores.get(tmpTopic);
@@ -246,14 +247,15 @@ public class MessageStoreManager implements StoreService {
if (tmpTopicConf != null) {
StringBuilder sBuilder = new StringBuilder(512);
for (int storeId = 0; storeId <
tmpTopicConf.getNumTopicStores(); storeId++) {
- String storeDir =
sBuilder.append(tmpTopicConf.getDataPath())
+ String storeDir =
sBuilder.append(tubeConfig.getPrimaryPath())
.append(File.separator).append(tmpTopic).append("-")
.append(storeId).toString();
sBuilder.delete(0, sBuilder.length());
+ logger.info("[Remove Topic] remove topic files : {}",
storeDir);
try {
delTopicFiles(storeDir);
} catch (Throwable e) {
- logger.error("[Remove Topic] Remove topic data
error : ", e);
+ logger.error("[Remove Topic] remove topic files
error : ", e);
}
ThreadUtils.sleep(50);
}
@@ -262,6 +264,7 @@ public class MessageStoreManager implements StoreService {
}
ThreadUtils.sleep(100);
}
+ logger.info("[Remove Topic] finished remove topics : {}",
removedTopics);
return removedTopics;
} finally {
this.isRemovingTopic.set(false);
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index fffb57c28..32fc6d303 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -254,9 +254,9 @@ public class TopicDeployEntity extends BaseEntity
implements Cloneable {
}
}
if (changed) {
- updSerialId();
this.brokerAddress =
KeyBuilderUtils.buildAddressInfo(this.brokerIp,
this.brokerPort);
+ updSerialId();
}
return changed;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index 1dff57566..bcdd5c1bb 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -450,7 +450,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements
ConsumeCtrlMapper {
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- topic2RecordCache.remove(curEntity.getTopicName());
+ topic2RecordCache.remove(curEntity.getTopicName(), new
ConcurrentHashSet<>());
}
}
// delete group index
@@ -458,7 +458,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements
ConsumeCtrlMapper {
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- group2RecordCache.remove(curEntity.getGroupName());
+ group2RecordCache.remove(curEntity.getGroupName(), new
ConcurrentHashSet<>());
}
}
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index f9c627cb9..e414c6d61 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -557,7 +557,7 @@ public abstract class AbsTopicDeployMapperImpl implements
TopicDeployMapper {
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- topicName2RecordCache.remove(curEntity.getTopicName());
+ topicName2RecordCache.remove(curEntity.getTopicName(), new
ConcurrentHashSet<>());
}
}
// delete brokerId index
@@ -565,7 +565,7 @@ public abstract class AbsTopicDeployMapperImpl implements
TopicDeployMapper {
if (keySet != null) {
keySet.remove(recordKey);
if (keySet.isEmpty()) {
- brokerId2RecordCache.remove(curEntity.getBrokerId());
+ brokerId2RecordCache.remove(curEntity.getBrokerId(), new
ConcurrentHashSet<>());
}
}
// delete broker topic map
@@ -573,7 +573,7 @@ public abstract class AbsTopicDeployMapperImpl implements
TopicDeployMapper {
if (keySet != null) {
keySet.remove(curEntity.getTopicName());
if (keySet.isEmpty()) {
- brokerId2TopicNameCache.remove(curEntity.getBrokerId());
+ brokerId2TopicNameCache.remove(curEntity.getBrokerId(), new
ConcurrentHashSet<>());
}
}
}