Hi,

Identified an issue regarding message purging when last subscriber for a
topic is disconnected from broker cluster.

@org.wso2.andes.kernel.OrphanedMessageHandler


private void removeMessagesOfDestinationForNode(String destination,
                                                    String ownerName,
boolean isTopic) throws AndesException {

        try {

            Long startMessageID = null;
            Long endMessageID = null;

            //Will first retrieve the last unassigned slot id
            String nodeID =
ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
            //Will get the storage queue name
*            String storageQueue =
AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
*            //We get the relevant slot from the coordinator if running on
cluster mode*
*            Slot unassignedSlot =
MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
*            //We need to get the starting message ID to inform the DB to
start slicing the message from there*
*            //This step would be done in order to ensure that tombstones
will not be fetched during the querying*
*            //operation*
*            startMessageID = unassignedSlot.getStartMessageId();*
            endMessageID = unassignedSlot.getEndMessageId();

            // This is a class used by AndesSubscriptionManager. Andes
Subscription Manager is behind Disruptor layer.
            // Hence the call should be made to MessagingEngine NOT Andes.
            // Calling Andes methods from here will lead to probable
deadlocks if Futures are used.
            // NOTE: purge call should be made to MessagingEngine not Andes
            if (0 < endMessageID) {
                //If the slot id is 0, which means for the given storage
queue there're no unassigned slots which means
                //we don't need to purge messages in this case
                //The purpose of purge operation is to make sure that
unassigned slots will be removed if no subs exists
                MessagingEngine.getInstance().purgeMessages(destination,
ownerName, isTopic, startMessageID);
            }
        } catch (ConnectionException e) {
            String mesage = "Error while establishing a connection with the
thrift server";
            log.error(mesage);
            throw new AndesException(mesage, e);
        }

    }

Why we need a start message id here ?
What about purging the whole internal queue (related to topic) ?

*MessagingEngine.getInstance().purgeMessages(destination, ownerName,
isTopic, 0);*

Thanks
-- 
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to