Hi
     
     I have a question on class TopicStorePrefetch and     
KahaDBStore.KahaDBTopicMessageStore, details as following:
     
     When persistent messages are added into cursor, if there is no     memory 
space, cursor cache is disabled. 
     To keep sync with store, QueueStorePrefetch do itby calling method     
[KahaDBMessageStore.setBatch], but TopicStorePrefetch do nothing.
     It may lead to duplicated messages recovered from store.
     
     I think TopicStorePrefetch should do the similar logic with     
QueueStorePrefetch, just like this:
     
     KahaDBStore.KahaDBTopicMessageStore.setBatch:
     ===============================================
     public void setBatch(final String clientId, final String     
subscriptionName, final MessageId identity, final int priority)     throws 
IOException {
         try {
            
             lockAsyncJobQueue();
     
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new     
Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws     IOException 
{
                         StoredDestination sd =     getStoredDestination(dest, 
tx);
                         String key = identity.toString();
                         String subscriptionKey =     subscriptionKey(clientId, 
subscriptionName);
                         Long location = sd.messageIdIndex.get(tx, key);
                         if (location == null) {
                             return;
                         }
                         
                         MessageOrderCursor moc =     
sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             moc = new MessageOrderCursor();
                             sd.subscriptionCursors.put(subscriptionKey,     
moc);
                         }
                         
                         if (priority == MessageOrderIndex.DEF) {
                             moc.defaultCursorPosition =     
location.longValue();
                         } else if (priority > MessageOrderIndex.HI) {
                             moc.highPriorityCursorPosition =     
location.longValue();
                         } else {
                             moc.lowPriorityCursorPosition =     
location.longValue();
                         }
                     }
                 });
             } finally {
                 indexLock.writeLock().unlock();
             }
         } finally {
             unlockAsyncJobQueue();
         }
     }
     ===============================================
     
     But i'm not sure it's right enough, please confirm the question.
     
     Thanks a lot.

Reply via email to