@Pamod,

Any idea on this?

Thanks

On Mon, Jul 6, 2015 at 4:30 PM, Hasitha Hiranya <[email protected]> wrote:

> Hi,
>
> Identified an issue regarding message purging when last subscriber for a
> topic is disconnected from broker cluster.
>
> @org.wso2.andes.kernel.OrphanedMessageHandler
>
>
> private void removeMessagesOfDestinationForNode(String destination,
>                                                     String ownerName,
> boolean isTopic) throws AndesException {
>
>         try {
>
>             Long startMessageID = null;
>             Long endMessageID = null;
>
>             //Will first retrieve the last unassigned slot id
>             String nodeID =
> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>             //Will get the storage queue name
> *            String storageQueue =
> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
> *            //We get the relevant slot from the coordinator if running on
> cluster mode*
> *            Slot unassignedSlot =
> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
> *            //We need to get the starting message ID to inform the DB to
> start slicing the message from there*
> *            //This step would be done in order to ensure that tombstones
> will not be fetched during the querying*
> *            //operation*
> *            startMessageID = unassignedSlot.getStartMessageId();*
>             endMessageID = unassignedSlot.getEndMessageId();
>
>             // This is a class used by AndesSubscriptionManager. Andes
> Subscription Manager is behind Disruptor layer.
>             // Hence the call should be made to MessagingEngine NOT Andes.
>             // Calling Andes methods from here will lead to probable
> deadlocks if Futures are used.
>             // NOTE: purge call should be made to MessagingEngine not Andes
>             if (0 < endMessageID) {
>                 //If the slot id is 0, which means for the given storage
> queue there're no unassigned slots which means
>                 //we don't need to purge messages in this case
>                 //The purpose of purge operation is to make sure that
> unassigned slots will be removed if no subs exists
>                 MessagingEngine.getInstance().purgeMessages(destination,
> ownerName, isTopic, startMessageID);
>             }
>         } catch (ConnectionException e) {
>             String mesage = "Error while establishing a connection with
> the thrift server";
>             log.error(mesage);
>             throw new AndesException(mesage, e);
>         }
>
>     }
>
> Why we need a start message id here ?
> What about purging the whole internal queue (related to topic) ?
>
> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
> isTopic, 0);*
>
> Thanks
> --
> *Hasitha Abeykoon*
> Senior Software Engineer; WSO2, Inc.; http://wso2.com
> *cell:* *+94 719363063*
> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>
>


-- 
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to