qianye1001 commented on code in PR #9457:
URL: https://github.com/apache/rocketmq/pull/9457#discussion_r2134950997
##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##########
@@ -403,6 +403,23 @@ public void destroy(ConsumeQueueInterface consumeQueue)
throws RocksDBException
}
}
+ /**
+ * ConsumerQueueTable, as an in-memory data structure, uses lazy loading
mechanism in RocksDBConsumeQueueStore.
+ * This means that when the broker restarts, it may not be able to
retrieve all ConsumerQueues from the table.
+ * Therefore, before deleting a topic, we need to attempt to build all
ConsumerQueues under that topic to ensure
+ * the completeness of the deletion operation.
+ */
+ @Override
+ public boolean deleteTopic(String topic) {
+ try {
+ Set<Integer> queueIds =
rocksDBConsumeQueueOffsetTable.scanAllQueueIdInTopic(topic);
+ queueIds.forEach(queueId -> findOrCreateConsumeQueue(topic,
queueId));
+ } catch (RocksDBException e) {
+ ERROR_LOG.error("Failed to scan queueIds for topic. topic={}",
topic, e);
Review Comment:
加载异常不影响 删除逻辑的正常进行,脏数据也可以在 commitlog 被清理后正常的被清理,只是在创建同名topic的时候会有影响
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]