This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEM-883 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 33b01b76ba952324a3974deac762eeb4d0d721cf Author: zhouxh <[email protected]> AuthorDate: Thu Nov 9 23:49:29 2017 -0800 GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender GEODE-3967: notifyTimestampsToGateways should inherit isConcurrencyConflict GEODE-3967: add to secondary event isConcurrencyConflict --- .../geode/internal/cache/AbstractRegionMap.java | 3 +++ .../geode/internal/cache/DestroyOperation.java | 3 --- .../internal/cache/DistributedCacheOperation.java | 15 +++++++++++++- .../apache/geode/internal/cache/LocalRegion.java | 19 +++++++++++++----- .../geode/internal/cache/LocalRegionDataView.java | 9 +++++++++ .../wan/AbstractGatewaySenderEventProcessor.java | 15 +++++++++----- .../internal/cache/wan/GatewaySenderEventImpl.java | 3 +++ .../serial/SerialGatewaySenderEventProcessor.java | 23 ++++++++++++++++++++++ 8 files changed, 76 insertions(+), 14 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index 404488b..75d8484 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -1183,6 +1183,7 @@ public abstract class AbstractRegionMap implements RegionMap { true/* conflict with clear */, duringRI, true); doPart3 = true; } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2092,6 +2093,7 @@ public abstract class AbstractRegionMap implements RegionMap { } } // !opCompleted } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2849,6 +2851,7 @@ public abstract class AbstractRegionMap implements RegionMap { clearOccured = true; owner.recordEvent(event); } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java index 20cbd28..4d376c6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java @@ -95,9 +95,6 @@ public class DestroyOperation extends DistributedCacheOperation { } catch (EntryNotFoundException e) { dispatchElidedEvent(rgn, ev); - if (!ev.isConcurrencyConflict()) { - rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev); - } throw e; } catch (CacheWriterException e) { throw new Error( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 505c618..5ab3a4a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -1290,11 +1290,24 @@ public abstract class DistributedCacheOperation { */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { - logger.debug("dispatching elided event: {}", ev); + logger.debug("GGG:dispatching elided event: {}", ev, new Exception()); } ev.isConcurrencyConflict(true); rgn.generateLocalFilterRouting(ev); rgn.notifyBridgeClients(ev); + rgn.notifyGatewaySender(getOperation(ev), ev); + } + + private EnumListenerEvent getOperation(EntryEventImpl ev) { + if (ev.getOperation().isInvalidate()) { + return EnumListenerEvent.AFTER_INVALIDATE; + } else if (ev.getOperation().isDestroy()) { + return EnumListenerEvent.AFTER_DESTROY; + } else if (ev.getOperation().isUpdate()) { + return EnumListenerEvent.AFTER_UPDATE; + } else { + return EnumListenerEvent.AFTER_CREATE; + } } protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 0d87b1d..d454bb7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -2853,6 +2853,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); } if (!getDataView().isDeferredStats()) { getCachePerfStats().endPut(startPut, event.isOriginRemote()); @@ -5630,6 +5632,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); + return false; } @@ -5856,6 +5861,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, updateTimeStampEvent.setGenerateCallbacks(false); updateTimeStampEvent.distributedMember = event.getDistributedMember(); updateTimeStampEvent.setNewEventId(getSystem()); + if (event.isConcurrencyConflict()) { + updateTimeStampEvent.isConcurrencyConflict(true); + } if (event.getRegion() instanceof BucketRegion) { BucketRegion bucketRegion = (BucketRegion) event.getRegion(); @@ -6117,8 +6125,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6142,9 +6149,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, if (allRemoteDSIds != null) { for (GatewaySender sender : getCache().getAllGatewaySenders()) { if (allGatewaySenderIds.contains(sender.getId())) { - // TODO: This is a BUG. Why return and not continue? - if (!this.getDataPolicy().withStorage() && sender.isParallel()) { - return; + // if isConcurrencyConflict is true, only notify serial gateway sender + if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict()) + && sender.isParallel()) { + continue; } if (logger.isDebugEnabled()) { logger.debug("Notifying the GatewaySender : {}", sender.getId()); @@ -6503,6 +6511,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); } + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); return true; // event was elided } catch (DiskAccessException dae) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index eed6176..b68859e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -25,6 +25,7 @@ import org.apache.geode.internal.cache.entries.AbstractRegionEntry; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.logging.LogService; /** * @@ -71,6 +72,10 @@ public class LocalRegionDataView implements InternalDataView { } catch (ConcurrentCacheModificationException e) { // a newer event has already been applied to the cache. this can happen // in a client cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event); } } @@ -81,6 +86,10 @@ public class LocalRegionDataView implements InternalDataView { } catch (ConcurrentCacheModificationException e) { // a later in time event has already been applied to the cache. this can happen // in a cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:updateEntryVersion:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7a2cee1..a557875 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -518,16 +518,21 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // version is < 7.0.1, especially to prevent another loop over events. if (!sendUpdateVersionEvents && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - if (isTraceEnabled) { - logger.trace( - "Update Event Version event: {} removed from Gateway Sender queue: {}", event, - sender); - } + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", + event, sender); itr.remove(); statistics.incEventsNotQueued(); continue; } + if (((GatewaySenderEventImpl) event).isConcurrencyConflict) { + if (isDebugEnabled) { + logger.debug("primary should ignore the concurrency conflict event:" + event); + } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } boolean transmit = filter.beforeTransmit(event); if (!transmit) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 5b1ba54..468dca2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -171,6 +171,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + public boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -312,6 +314,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 734b560..995007d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -291,6 +291,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (o != null && o instanceof GatewaySenderEventImpl) { GatewaySenderEventImpl ge = (GatewaySenderEventImpl) o; EventWrapper unprocessedEvent = this.unprocessedEvents.remove(ge.getEventId()); + if (unprocessedEvent != null && ge.isConcurrencyConflict) { + logger.info( + "GGG:secondary after removed by handleFailover:" + unprocessedEvent + ":" + ge); + } if (unprocessedEvent != null) { unprocessedEvent.event.release(); if (this.unprocessedEvents.isEmpty()) { @@ -379,6 +383,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; + logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent); gatewayEvent.release(); } this.unprocessedEvents = null; @@ -632,6 +637,10 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven return; // now we can safely use the unprocessedEvents field EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId()); + if (ew != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed by destroy listener:" + ew + ":" + gatewayEvent); + } + if (ew != null) { ew.event.release(); statistics.incUnprocessedEventsRemovedByPrimary(); @@ -651,8 +660,16 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven return; // now we can safely use the unprocessedEvents field EventWrapper ew = this.unprocessedEvents.remove(gatewayEvent.getEventId()); + if (ew != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed by create listener:" + ew + ":" + gatewayEvent, + new Exception()); + } if (ew == null) { + if (gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary before add to by create listener:" + gatewayEvent, + new Exception()); + } // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromPrimary event {} : {}->{} added to unprocessed token map", @@ -711,8 +728,14 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // @todo add an assertion that !getPrimary() // now we can safely use the unprocessedEvents field Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId()); + if (v != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed token:" + v + ":" + gatewayEvent); + } if (v == null) { + if (gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception()); + } // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
