+1 if we can partition contents by the queue name, we could delete it
directly.

On Mon, Jul 6, 2015 at 7:36 AM, Ramith Jayasinghe <[email protected]> wrote:

> I think we can simplify this one. and do a delete without even getting
> message ids.
>
>
> On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester <[email protected]> wrote:
>
>> Hi Hasitha,
>>
>> With reference to the discussion in [1], this was done to prevent
>> Tombstone overwhelming exception when Cassandra was used as the message
>> store.
>>
>> AFAIR The process of purging was as follows,
>>
>> 1) From MessageMetaData CF, retrieve the corresponding message ids
>> 2) Delete the corresponding messages from MessageMetaData and then delete
>> them from the MessageContent.
>>
>> When transacting > 1,000,000 messages and disconnection of the
>> subscription will go through the above mentioned process.
>>
>> The issue is,
>>
>> a) Message content is not partitioned based on the queue name as Message
>> Meta data. so we cannot delete the content row wise, which restricted us to
>> retrieve ids from Message Meta data and remove them from content.
>> b) When retrieving message meta data (let's say we used a range query to
>> select all the ids) there will be tombstones that will be selected with
>> that query that will hit the limit, causing the Tombstone overwhelming
>> exception.
>>
>> As a solution, when  removeMessagesOfDestinationForNode() is called we
>> get the current fresh slot, and the idea of getting the start id of the
>> slot is that it assures that the message ids that should be selected from
>> the message meta data should be greater than the value defined in the start
>> id (since messages with ids < start id are the messages that have being
>> delivered already that has caused tombstones).
>>
>> Also, I believe we could change the flow now since we discontinue
>> Cassandra. Since in RDBMS you could select all ids without any issue.
>>
>> Hope this explains your query.
>>
>> [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
>> Triggered When Subscription Disconnection/Deletion
>>
>> Thanks,
>> Pamod
>>
>> On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> Identified an issue regarding message purging when last subscriber for a
>>> topic is disconnected from broker cluster.
>>>
>>> @org.wso2.andes.kernel.OrphanedMessageHandler
>>>
>>>
>>> private void removeMessagesOfDestinationForNode(String destination,
>>>                                                     String ownerName,
>>> boolean isTopic) throws AndesException {
>>>
>>>         try {
>>>
>>>             Long startMessageID = null;
>>>             Long endMessageID = null;
>>>
>>>             //Will first retrieve the last unassigned slot id
>>>             String nodeID =
>>> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>>>             //Will get the storage queue name
>>> *            String storageQueue =
>>> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
>>> *            //We get the relevant slot from the coordinator if running
>>> on cluster mode*
>>> *            Slot unassignedSlot =
>>> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
>>> *            //We need to get the starting message ID to inform the DB
>>> to start slicing the message from there*
>>> *            //This step would be done in order to ensure that
>>> tombstones will not be fetched during the querying*
>>> *            //operation*
>>> *            startMessageID = unassignedSlot.getStartMessageId();*
>>>             endMessageID = unassignedSlot.getEndMessageId();
>>>
>>>             // This is a class used by AndesSubscriptionManager. Andes
>>> Subscription Manager is behind Disruptor layer.
>>>             // Hence the call should be made to MessagingEngine NOT
>>> Andes.
>>>             // Calling Andes methods from here will lead to probable
>>> deadlocks if Futures are used.
>>>             // NOTE: purge call should be made to MessagingEngine not
>>> Andes
>>>             if (0 < endMessageID) {
>>>                 //If the slot id is 0, which means for the given storage
>>> queue there're no unassigned slots which means
>>>                 //we don't need to purge messages in this case
>>>                 //The purpose of purge operation is to make sure that
>>> unassigned slots will be removed if no subs exists
>>>                 MessagingEngine.getInstance().purgeMessages(destination,
>>> ownerName, isTopic, startMessageID);
>>>             }
>>>         } catch (ConnectionException e) {
>>>             String mesage = "Error while establishing a connection with
>>> the thrift server";
>>>             log.error(mesage);
>>>             throw new AndesException(mesage, e);
>>>         }
>>>
>>>     }
>>>
>>> Why we need a start message id here ?
>>> What about purging the whole internal queue (related to topic) ?
>>>
>>> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
>>> isTopic, 0);*
>>>
>>> Thanks
>>> --
>>> *Hasitha Abeykoon*
>>> Senior Software Engineer; WSO2, Inc.; http://wso2.com
>>> *cell:* *+94 719363063*
>>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com>
>>>
>>>
>>
>>
>> --
>> *Pamod Sylvester *
>>
>> *WSO2 Inc.; http://wso2.com <http://wso2.com>*
>> cell: +94 77 7779495
>>
>
>
>
> --
> Ramith Jayasinghe
> Technical Lead
> WSO2 Inc., http://wso2.com
> lean.enterprise.middleware
>
> E: [email protected]
> P: +94 777542851
>
>


-- 
*Pamod Sylvester *

*WSO2 Inc.; http://wso2.com <http://wso2.com>*
cell: +94 77 7779495
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to