Hi, Identified an issue regarding message purging when last subscriber for a topic is disconnected from broker cluster.
@org.wso2.andes.kernel.OrphanedMessageHandler private void removeMessagesOfDestinationForNode(String destination, String ownerName, boolean isTopic) throws AndesException { try { Long startMessageID = null; Long endMessageID = null; //Will first retrieve the last unassigned slot id String nodeID = ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID(); //Will get the storage queue name * String storageQueue = AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);* * //We get the relevant slot from the coordinator if running on cluster mode* * Slot unassignedSlot = MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);* * //We need to get the starting message ID to inform the DB to start slicing the message from there* * //This step would be done in order to ensure that tombstones will not be fetched during the querying* * //operation* * startMessageID = unassignedSlot.getStartMessageId();* endMessageID = unassignedSlot.getEndMessageId(); // This is a class used by AndesSubscriptionManager. Andes Subscription Manager is behind Disruptor layer. // Hence the call should be made to MessagingEngine NOT Andes. // Calling Andes methods from here will lead to probable deadlocks if Futures are used. // NOTE: purge call should be made to MessagingEngine not Andes if (0 < endMessageID) { //If the slot id is 0, which means for the given storage queue there're no unassigned slots which means //we don't need to purge messages in this case //The purpose of purge operation is to make sure that unassigned slots will be removed if no subs exists MessagingEngine.getInstance().purgeMessages(destination, ownerName, isTopic, startMessageID); } } catch (ConnectionException e) { String mesage = "Error while establishing a connection with the thrift server"; log.error(mesage); throw new AndesException(mesage, e); } } Why we need a start message id here ? What about purging the whole internal queue (related to topic) ? *MessagingEngine.getInstance().purgeMessages(destination, ownerName, isTopic, 0);* Thanks -- *Hasitha Abeykoon* Senior Software Engineer; WSO2, Inc.; http://wso2.com *cell:* *+94 719363063* *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
_______________________________________________ Dev mailing list Dev@wso2.org http://wso2.org/cgi-bin/mailman/listinfo/dev