lizhimins commented on code in PR #9457:
URL: https://github.com/apache/rocketmq/pull/9457#discussion_r2134946678
##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##########
@@ -403,6 +403,23 @@ public void destroy(ConsumeQueueInterface consumeQueue)
throws RocksDBException
}
}
+ /**
+ * ConsumerQueueTable, as an in-memory data structure, uses lazy loading
mechanism in RocksDBConsumeQueueStore.
+ * This means that when the broker restarts, it may not be able to
retrieve all ConsumerQueues from the table.
+ * Therefore, before deleting a topic, we need to attempt to build all
ConsumerQueues under that topic to ensure
+ * the completeness of the deletion operation.
+ */
+ @Override
+ public boolean deleteTopic(String topic) {
+ try {
+ Set<Integer> queueIds =
rocksDBConsumeQueueOffsetTable.scanAllQueueIdInTopic(topic);
+ queueIds.forEach(queueId -> findOrCreateConsumeQueue(topic,
queueId));
+ } catch (RocksDBException e) {
+ ERROR_LOG.error("Failed to scan queueIds for topic. topic={}",
topic, e);
Review Comment:
这里是不抛出异常吗
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]