[ 
https://issues.apache.org/jira/browse/GEODE-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012316#comment-17012316
 ] 

ASF subversion and git services commented on GEODE-7643:
--------------------------------------------------------

Commit da5c205af655b110679fe01534d469994fd414ed in geode's branch 
refs/heads/develop from BenjaminPerryRoss
[ https://gitbox.apache.org/repos/asf?p=geode.git;h=da5c205 ]

GEODE-7643: Gateway unprocessedTokensMap appears to grow without bounds with 
replicated regions and peer accessors (#4447)

* Allow LocalRegion.virtualPut to throw ConcurrentCacheModificationException

Added throwsConcurrentModifiaction boolean argument to LocalRegion.virtualPut() 
to determine if the exception should be thrown
Added invokeCallbacks boolean argument to LocalRegion.virtualPut() to determine 
if bridge clients and gateway senders should be notified of the event
Fixed issue with inheriting new virtualPut implementations
Added DUnit test to confirm fix

Co-authored-by: Benjamin Ross <br...@pivotal.io>
Co-authored-by: Donal Evans <doev...@pivotal.io>


> Gateway unprocessedTokensMap appears to grow without bounds with replicated 
> regions and peer accessors
> ------------------------------------------------------------------------------------------------------
>
>                 Key: GEODE-7643
>                 URL: https://issues.apache.org/jira/browse/GEODE-7643
>             Project: Geode
>          Issue Type: Bug
>          Components: wan
>            Reporter: Donal Evans
>            Assignee: Donal Evans
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When peer accessors do puts to a replicated region with a serial gateway 
> sender via multiple threads and on the same key, 
> {{ConcurrentCacheModificationException}} in {{LocalRegion.virtualPut}} causes 
> {{notifyGatewaySender}} to be called, which puts the event into the queue. 
> Since the {{AbstractUpdateOperation.doPutOrCreate}} method can potentially 
> call {{LocalRegion.virtualPut}} three times and encounter a 
> {{ConcurrentCacheModificationException}} each time, this can lead to the 
> event being put in the queue twice but only removed once and causing the 
> unprocessedTokensMap to accumulate events.
> Here are the two stacks:
> {noformat}
> [warn 2019/12/02 12:47:59.102 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 
> port=59034> tid=0x61] XXX LocalRegion.virtualPut caught 
> ConcurrentCacheModificationException about to notifyGatewaySender 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=5;sequenceID=273];
>  ifNew=false; ifOld=true; overwriteDestroyed=false; eventIdentity=329453507; 
> eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163; 
> payloadLength=0 bytes]
> java.lang.Exception
>       at 
> org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5591)
>       at 
> org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:385)
>       at 
> org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:162)
>       at 
> org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5561)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation.doPutOrCreate(AbstractUpdateOperation.java:182)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.basicOperateOnRegion(AbstractUpdateOperation.java:287)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.operateOnRegion(AbstractUpdateOperation.java:258)
>       at 
> org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.basicProcess(DistributedCacheOperation.java:1206)
>       at 
> org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.process(DistributedCacheOperation.java:1108)
>       at 
> org.apache.geode.distributed.internal.DistributionMessage.scheduleAction(DistributionMessage.java:372)
>       at 
> org.apache.geode.distributed.internal.DistributionMessage.schedule(DistributionMessage.java:427)
> {noformat}
> {noformat}
> [warn 2019/12/02 12:47:59.108 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 
> port=59034> tid=0x61] XXX LocalRegion.virtualPut caught 
> ConcurrentCacheModificationException about to notifyGatewaySender 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=5;sequenceID=273];
>  ifNew=false; ifOld=false; overwriteDestroyed=true; eventIdentity=329453507; 
> eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; price=163; 
> payloadLength=0 bytes]
> java.lang.Exception
>       at 
> org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5591)
>       at 
> org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:385)
>       at 
> org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:162)
>       at 
> org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5561)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation.doPutOrCreate(AbstractUpdateOperation.java:194)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.basicOperateOnRegion(AbstractUpdateOperation.java:287)
>       at 
> org.apache.geode.internal.cache.AbstractUpdateOperation$AbstractUpdateMessage.operateOnRegion(AbstractUpdateOperation.java:258)
>       at 
> org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.basicProcess(DistributedCacheOperation.java:1206)
>       at 
> org.apache.geode.internal.cache.DistributedCacheOperation$CacheOperationMessage.process(DistributedCacheOperation.java:1108)
> {noformat}
> Here are the corresponding puts into the queue:
> {noformat}
> [warn 2019/12/02 12:47:59.104 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 
> port=59034> tid=0x61] XXX SerialGatewaySenderQueue.putAndGetKey key=3625; 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273];
>  eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; 
> price=163.08897399902344; payloadLength=0 bytes]
> {noformat}
> {noformat}
> [warn 2019/12/02 12:47:59.110 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-2:85182)<v97>:41004 unshared ordered uid=11 dom #2 
> port=59034> tid=0x61] XXX SerialGatewaySenderQueue.putAndGetKey key=3635; 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273];
>  eventValue=Trade[id=-1501795011; cusip=PVTL; shares=29; 
> price=163.08897399902344; payloadLength=0 bytes]
> {noformat}
> On the secondary, when the event is received via normal replication, its 
> added to the unprocessedEvents map:
> {noformat}
> [warn 2019/12/02 12:47:59.100 PST <P2P message reader for 
> 10.255.202.119(accessor-ln-1:85194)<v98>:41005 unshared ordered uid=13 dom #1 
> port=59022> tid=0x58] 
> SerialGatewaySenderEventProcessor.basicHandleSecondaryEvent put 
> unprocessedEvents 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]
> {noformat}
> The first replication from the primary queue is received which removes the 
> event from the unprocessedEvents map:
> {noformat}
> [warn 2019/12/02 12:47:59.104 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-1:85170)<v96>:41003 unshared ordered uid=18 dom #3 
> port=59052> tid=0x68] XXX SerialSecondaryGatewayListener.afterCreate 
> senderEvent=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]
> [warn 2019/12/02 12:47:59.104 PST <Queued Gateway Listener Thread1> tid=0x5e] 
> SerialGatewaySenderEventProcessor.basicHandlePrimaryEvent removed 
> unprocessedEvents 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273];
>  
> value=org.apache.geode.internal.cache.wan.AbstractGatewaySender$EventWrapper@3f6df03b
> {noformat}
> Then the second replication from the primary queue is received which 
> incorrectly adds the event to the unprocessedTokens map where is stays 
> forever:
> {noformat}
> [warn 2019/12/02 12:47:59.110 PST <P2P message reader for 
> 10.255.202.119(gateway-ln-1:85170)<v96>:41003 unshared ordered uid=18 dom #3 
> port=59052> tid=0x68] XXX SerialSecondaryGatewayListener.afterCreate 
> senderEvent=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273]
> [warn 2019/12/02 12:47:59.110 PST <Queued Gateway Listener Thread1> tid=0x5e] 
> SerialGatewaySenderEventProcessor.basicHandlePrimaryEvent put 
> unprocessedTokens 
> eventId=EventID[10.255.202.119(accessor-ln-1)<v98>:41005;threadID=0x30002|5;sequenceID=273];
>  value=1575319799110; size=914 {noformat}
>  
> The proposed solution to this issue is to add two boolean arguments to the 
> {{LocalRegion.virtualPut}} method, one to control if a 
> {{ConcurrentCacheModificationException}} should result in notifying the 
> bridge clients and gateway senders, and another to control if any 
> {{ConcurrentCacheModificationException}} encountered should be thrown or 
> suppressed. These arguments allow the 
> {{AbstractUpdateOperation.doPutOrCreate}} method to 1. prevent subsequent 
> calls to {{LocalRegion.virtualPut}} following a 
> {{ConcurrentCacheModificationException}} from notifying the gateway sender, 
> and 2. know whether or not the {{LocalRegion.virtualPut}} method failed 
> specifically due to a {{ConcurrentCacheModificationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to