lizhimins commented on code in PR #9256:
URL: https://github.com/apache/rocketmq/pull/9256#discussion_r2113361236
##########
store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java:
##########
@@ -117,4 +103,43 @@ public long getStoreTime(CqUnit cqUnit) {
}
return -1;
}
+
+ /**
+ * get max physic offset in consumeQueue
+ *
+ * @return the max physic offset in consumeQueue
+ * @throws RocksDBException only in rocksdb mode
+ */
+ public abstract long getMaxPhyOffsetInConsumeQueue() throws
RocksDBException;
+
+ /**
+ * destroy the specific consumeQueue
+ *
+ * @param consumeQueue consumeQueue to be destroyed
+ * @throws RocksDBException only in rocksdb mode
+ */
+ protected abstract void destroy(ConsumeQueueInterface consumeQueue) throws
RocksDBException;
+
+ @Override
+ public boolean deleteTopic(String topic) {
+ ConcurrentMap<Integer, ConsumeQueueInterface> queueTable =
this.consumeQueueTable.get(topic);
+
+ if (queueTable == null || queueTable.isEmpty()) {
+ return false;
+ }
+
+ for (ConsumeQueueInterface cq : queueTable.values()) {
+ try {
+ destroy(cq);
+ } catch (RocksDBException e) {
+ log.error("DeleteTopic: ConsumeQueue cleans error!, topic={},
queueId={}", cq.getTopic(), cq.getQueueId(), e);
Review Comment:
这里失败了可能有脏数据
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]