Hi Hasitha,

With reference to the discussion in [1], this was done to prevent Tombstone
overwhelming exception when Cassandra was used as the message store.

AFAIR The process of purging was as follows,

1) From MessageMetaData CF, retrieve the corresponding message ids
2) Delete the corresponding messages from MessageMetaData and then delete
them from the MessageContent.

When transacting > 1,000,000 messages and disconnection of the subscription
will go through the above mentioned process.

The issue is,

a) Message content is not partitioned based on the queue name as Message
Meta data. so we cannot delete the content row wise, which restricted us to
retrieve ids from Message Meta data and remove them from content.
b) When retrieving message meta data (let's say we used a range query to
select all the ids) there will be tombstones that will be selected with
that query that will hit the limit, causing the Tombstone overwhelming
exception.

As a solution, when  removeMessagesOfDestinationForNode() is called we get
the current fresh slot, and the idea of getting the start id of the slot is
that it assures that the message ids that should be selected from the
message meta data should be greater than the value defined in the start id
(since messages with ids < start id are the messages that have being
delivered already that has caused tombstones).

Also, I believe we could change the flow now since we discontinue
Cassandra. Since in RDBMS you could select all ids without any issue.

Hope this explains your query.

[1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
Triggered When Subscription Disconnection/Deletion

Thanks,
Pamod

On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya <hasit...@wso2.com> wrote:

> Hi,
>
> Identified an issue regarding message purging when last subscriber for a
> topic is disconnected from broker cluster.
>
> @org.wso2.andes.kernel.OrphanedMessageHandler
>
>
> private void removeMessagesOfDestinationForNode(String destination,
>                                                     String ownerName,
> boolean isTopic) throws AndesException {
>
>         try {
>
>             Long startMessageID = null;
>             Long endMessageID = null;
>
>             //Will first retrieve the last unassigned slot id
>             String nodeID =
> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>             //Will get the storage queue name
> *            String storageQueue =
> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
> *            //We get the relevant slot from the coordinator if running on
> cluster mode*
> *            Slot unassignedSlot =
> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
> *            //We need to get the starting message ID to inform the DB to
> start slicing the message from there*
> *            //This step would be done in order to ensure that tombstones
> will not be fetched during the querying*
> *            //operation*
> *            startMessageID = unassignedSlot.getStartMessageId();*
>             endMessageID = unassignedSlot.getEndMessageId();
>
>             // This is a class used by AndesSubscriptionManager. Andes
> Subscription Manager is behind Disruptor layer.
>             // Hence the call should be made to MessagingEngine NOT Andes.
>             // Calling Andes methods from here will lead to probable
> deadlocks if Futures are used.
>             // NOTE: purge call should be made to MessagingEngine not Andes
>             if (0 < endMessageID) {
>                 //If the slot id is 0, which means for the given storage
> queue there're no unassigned slots which means
>                 //we don't need to purge messages in this case
>                 //The purpose of purge operation is to make sure that
> unassigned slots will be removed if no subs exists
>                 MessagingEngine.getInstance().purgeMessages(destination,
> ownerName, isTopic, startMessageID);
>             }
>         } catch (ConnectionException e) {
>             String mesage = "Error while establishing a connection with
> the thrift server";
>             log.error(mesage);
>             throw new AndesException(mesage, e);
>         }
>
>     }
>
> Why we need a start message id here ?
> What about purging the whole internal queue (related to topic) ?
>
> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
> isTopic, 0);*
>
> Thanks
> --
> *Hasitha Abeykoon*
> Senior Software Engineer; WSO2, Inc.; http://wso2.com
> *cell:* *+94 719363063*
> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>
>


-- 
*Pamod Sylvester *

*WSO2 Inc.; http://wso2.com <http://wso2.com>*
cell: +94 77 7779495
_______________________________________________
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to