This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 87e647fef [INLONG-6422][TubeMQ] Deleting the topic was unsuccessful 
(#6445)
87e647fef is described below

commit 87e647fef6f35e54a33fe0ff7a422e76d34052eb
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Nov 7 19:48:46 2022 +0800

    [INLONG-6422][TubeMQ] Deleting the topic was unsuccessful (#6445)
---
 .../inlong/tubemq/server/broker/msgstore/MessageStoreManager.java  | 7 +++++--
 .../master/metamanage/metastore/dao/entity/TopicDeployEntity.java  | 2 +-
 .../master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java | 4 ++--
 .../master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java | 6 +++---
 4 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 1856b8252..14ff6c863 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -223,6 +223,7 @@ public class MessageStoreManager implements StoreService {
             if (targetTopics.isEmpty()) {
                 return removedTopics;
             }
+            logger.info("[Remove Topic] start remove topics : {}", 
targetTopics);
             for (String tmpTopic : targetTopics) {
                 ConcurrentHashMap<Integer, MessageStore> topicStores =
                         dataStores.get(tmpTopic);
@@ -246,14 +247,15 @@ public class MessageStoreManager implements StoreService {
                 if (tmpTopicConf != null) {
                     StringBuilder sBuilder = new StringBuilder(512);
                     for (int storeId = 0; storeId < 
tmpTopicConf.getNumTopicStores(); storeId++) {
-                        String storeDir = 
sBuilder.append(tmpTopicConf.getDataPath())
+                        String storeDir = 
sBuilder.append(tubeConfig.getPrimaryPath())
                                 
.append(File.separator).append(tmpTopic).append("-")
                                 .append(storeId).toString();
                         sBuilder.delete(0, sBuilder.length());
+                        logger.info("[Remove Topic] remove topic files : {}", 
storeDir);
                         try {
                             delTopicFiles(storeDir);
                         } catch (Throwable e) {
-                            logger.error("[Remove Topic] Remove topic data 
error : ", e);
+                            logger.error("[Remove Topic] remove topic files 
error : ", e);
                         }
                         ThreadUtils.sleep(50);
                     }
@@ -262,6 +264,7 @@ public class MessageStoreManager implements StoreService {
                 }
                 ThreadUtils.sleep(100);
             }
+            logger.info("[Remove Topic] finished remove topics : {}", 
removedTopics);
             return removedTopics;
         } finally {
             this.isRemovingTopic.set(false);
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
index fffb57c28..32fc6d303 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java
@@ -254,9 +254,9 @@ public class TopicDeployEntity extends BaseEntity 
implements Cloneable {
             }
         }
         if (changed) {
-            updSerialId();
             this.brokerAddress =
                     KeyBuilderUtils.buildAddressInfo(this.brokerIp, 
this.brokerPort);
+            updSerialId();
         }
         return changed;
     }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
index 1dff57566..bcdd5c1bb 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsConsumeCtrlMapperImpl.java
@@ -450,7 +450,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements 
ConsumeCtrlMapper {
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                topic2RecordCache.remove(curEntity.getTopicName());
+                topic2RecordCache.remove(curEntity.getTopicName(), new 
ConcurrentHashSet<>());
             }
         }
         // delete group index
@@ -458,7 +458,7 @@ public abstract class AbsConsumeCtrlMapperImpl implements 
ConsumeCtrlMapper {
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                group2RecordCache.remove(curEntity.getGroupName());
+                group2RecordCache.remove(curEntity.getGroupName(), new 
ConcurrentHashSet<>());
             }
         }
     }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
index f9c627cb9..e414c6d61 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/AbsTopicDeployMapperImpl.java
@@ -557,7 +557,7 @@ public abstract class AbsTopicDeployMapperImpl implements 
TopicDeployMapper {
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                topicName2RecordCache.remove(curEntity.getTopicName());
+                topicName2RecordCache.remove(curEntity.getTopicName(), new 
ConcurrentHashSet<>());
             }
         }
         // delete brokerId index
@@ -565,7 +565,7 @@ public abstract class AbsTopicDeployMapperImpl implements 
TopicDeployMapper {
         if (keySet != null) {
             keySet.remove(recordKey);
             if (keySet.isEmpty()) {
-                brokerId2RecordCache.remove(curEntity.getBrokerId());
+                brokerId2RecordCache.remove(curEntity.getBrokerId(), new 
ConcurrentHashSet<>());
             }
         }
         // delete broker topic map
@@ -573,7 +573,7 @@ public abstract class AbsTopicDeployMapperImpl implements 
TopicDeployMapper {
         if (keySet != null) {
             keySet.remove(curEntity.getTopicName());
             if (keySet.isEmpty()) {
-                brokerId2TopicNameCache.remove(curEntity.getBrokerId());
+                brokerId2TopicNameCache.remove(curEntity.getBrokerId(), new 
ConcurrentHashSet<>());
             }
         }
     }

Reply via email to