+1 if we can partition contents by the queue name, we could delete it directly.
On Mon, Jul 6, 2015 at 7:36 AM, Ramith Jayasinghe <[email protected]> wrote: > I think we can simplify this one. and do a delete without even getting > message ids. > > > On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester <[email protected]> wrote: > >> Hi Hasitha, >> >> With reference to the discussion in [1], this was done to prevent >> Tombstone overwhelming exception when Cassandra was used as the message >> store. >> >> AFAIR The process of purging was as follows, >> >> 1) From MessageMetaData CF, retrieve the corresponding message ids >> 2) Delete the corresponding messages from MessageMetaData and then delete >> them from the MessageContent. >> >> When transacting > 1,000,000 messages and disconnection of the >> subscription will go through the above mentioned process. >> >> The issue is, >> >> a) Message content is not partitioned based on the queue name as Message >> Meta data. so we cannot delete the content row wise, which restricted us to >> retrieve ids from Message Meta data and remove them from content. >> b) When retrieving message meta data (let's say we used a range query to >> select all the ids) there will be tombstones that will be selected with >> that query that will hit the limit, causing the Tombstone overwhelming >> exception. >> >> As a solution, when removeMessagesOfDestinationForNode() is called we >> get the current fresh slot, and the idea of getting the start id of the >> slot is that it assures that the message ids that should be selected from >> the message meta data should be greater than the value defined in the start >> id (since messages with ids < start id are the messages that have being >> delivered already that has caused tombstones). >> >> Also, I believe we could change the flow now since we discontinue >> Cassandra. Since in RDBMS you could select all ids without any issue. >> >> Hope this explains your query. >> >> [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is >> Triggered When Subscription Disconnection/Deletion >> >> Thanks, >> Pamod >> >> On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya <[email protected]> >> wrote: >> >>> Hi, >>> >>> Identified an issue regarding message purging when last subscriber for a >>> topic is disconnected from broker cluster. >>> >>> @org.wso2.andes.kernel.OrphanedMessageHandler >>> >>> >>> private void removeMessagesOfDestinationForNode(String destination, >>> String ownerName, >>> boolean isTopic) throws AndesException { >>> >>> try { >>> >>> Long startMessageID = null; >>> Long endMessageID = null; >>> >>> //Will first retrieve the last unassigned slot id >>> String nodeID = >>> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID(); >>> //Will get the storage queue name >>> * String storageQueue = >>> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);* >>> * //We get the relevant slot from the coordinator if running >>> on cluster mode* >>> * Slot unassignedSlot = >>> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);* >>> * //We need to get the starting message ID to inform the DB >>> to start slicing the message from there* >>> * //This step would be done in order to ensure that >>> tombstones will not be fetched during the querying* >>> * //operation* >>> * startMessageID = unassignedSlot.getStartMessageId();* >>> endMessageID = unassignedSlot.getEndMessageId(); >>> >>> // This is a class used by AndesSubscriptionManager. Andes >>> Subscription Manager is behind Disruptor layer. >>> // Hence the call should be made to MessagingEngine NOT >>> Andes. >>> // Calling Andes methods from here will lead to probable >>> deadlocks if Futures are used. >>> // NOTE: purge call should be made to MessagingEngine not >>> Andes >>> if (0 < endMessageID) { >>> //If the slot id is 0, which means for the given storage >>> queue there're no unassigned slots which means >>> //we don't need to purge messages in this case >>> //The purpose of purge operation is to make sure that >>> unassigned slots will be removed if no subs exists >>> MessagingEngine.getInstance().purgeMessages(destination, >>> ownerName, isTopic, startMessageID); >>> } >>> } catch (ConnectionException e) { >>> String mesage = "Error while establishing a connection with >>> the thrift server"; >>> log.error(mesage); >>> throw new AndesException(mesage, e); >>> } >>> >>> } >>> >>> Why we need a start message id here ? >>> What about purging the whole internal queue (related to topic) ? >>> >>> *MessagingEngine.getInstance().purgeMessages(destination, ownerName, >>> isTopic, 0);* >>> >>> Thanks >>> -- >>> *Hasitha Abeykoon* >>> Senior Software Engineer; WSO2, Inc.; http://wso2.com >>> *cell:* *+94 719363063* >>> *blog: **abeykoon.blogspot.com* <http://abeykoon.blogspot.com> >>> >>> >> >> >> -- >> *Pamod Sylvester * >> >> *WSO2 Inc.; http://wso2.com <http://wso2.com>* >> cell: +94 77 7779495 >> > > > > -- > Ramith Jayasinghe > Technical Lead > WSO2 Inc., http://wso2.com > lean.enterprise.middleware > > E: [email protected] > P: +94 777542851 > > -- *Pamod Sylvester * *WSO2 Inc.; http://wso2.com <http://wso2.com>* cell: +94 77 7779495
_______________________________________________ Dev mailing list [email protected] http://wso2.org/cgi-bin/mailman/listinfo/dev
