This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEM-883 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 30185338b67b3954eac1c2ca46021a86efb71b89 Author: zhouxh <[email protected]> AuthorDate: Thu Nov 9 23:49:29 2017 -0800 GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender --- .../apache/geode/internal/cache/AbstractRegionMap.java | 3 +++ .../apache/geode/internal/cache/DestroyOperation.java | 3 --- .../geode/internal/cache/DistributedCacheOperation.java | 15 ++++++++++++++- .../org/apache/geode/internal/cache/LocalRegion.java | 16 +++++++++++----- .../apache/geode/internal/cache/LocalRegionDataView.java | 9 +++++++++ .../cache/wan/AbstractGatewaySenderEventProcessor.java | 15 ++++++++++----- .../geode/internal/cache/wan/GatewaySenderEventImpl.java | 3 +++ .../wan/serial/SerialGatewaySenderEventProcessor.java | 8 +++++++- 8 files changed, 57 insertions(+), 15 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index da0cf59..ee0a4aa 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -1188,6 +1188,7 @@ public abstract class AbstractRegionMap implements RegionMap { true/* conflict with clear */, duringRI, true); doPart3 = true; } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2097,6 +2098,7 @@ public abstract class AbstractRegionMap implements RegionMap { } } // !opCompleted } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. @@ -2854,6 +2856,7 @@ public abstract class AbstractRegionMap implements RegionMap { clearOccured = true; owner.recordEvent(event); } catch (ConcurrentCacheModificationException ccme) { + event.isConcurrencyConflict(true); VersionTag tag = event.getVersionTag(); if (tag != null && tag.isTimeStampUpdated()) { // Notify gateways of new time-stamp. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java index 20cbd28..4d376c6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java @@ -95,9 +95,6 @@ public class DestroyOperation extends DistributedCacheOperation { } catch (EntryNotFoundException e) { dispatchElidedEvent(rgn, ev); - if (!ev.isConcurrencyConflict()) { - rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev); - } throw e; } catch (CacheWriterException e) { throw new Error( diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 505c618..5ab3a4a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -1290,11 +1290,24 @@ public abstract class DistributedCacheOperation { */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { - logger.debug("dispatching elided event: {}", ev); + logger.debug("GGG:dispatching elided event: {}", ev, new Exception()); } ev.isConcurrencyConflict(true); rgn.generateLocalFilterRouting(ev); rgn.notifyBridgeClients(ev); + rgn.notifyGatewaySender(getOperation(ev), ev); + } + + private EnumListenerEvent getOperation(EntryEventImpl ev) { + if (ev.getOperation().isInvalidate()) { + return EnumListenerEvent.AFTER_INVALIDATE; + } else if (ev.getOperation().isDestroy()) { + return EnumListenerEvent.AFTER_DESTROY; + } else if (ev.getOperation().isUpdate()) { + return EnumListenerEvent.AFTER_UPDATE; + } else { + return EnumListenerEvent.AFTER_CREATE; + } } protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index bed336a..158ff68 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); } if (!getDataView().isDeferredStats()) { getCachePerfStats().endPut(startPut, event.isOriginRemote()); @@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade logger.debug("caught concurrent modification attempt when applying {}", event); } notifyBridgeClients(event); + notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE + : EnumListenerEvent.AFTER_CREATE, event); + return false; } @@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade } protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) { - if (isPdxTypesRegion() || event.isConcurrencyConflict()) { - // isConcurrencyConflict is usually a concurrent cache modification problem + if (isPdxTypesRegion()) { return; } @@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade if (allRemoteDSIds != null) { for (GatewaySender sender : getCache().getAllGatewaySenders()) { if (allGatewaySenderIds.contains(sender.getId())) { - // TODO: This is a BUG. Why return and not continue? - if (!this.getDataPolicy().withStorage() && sender.isParallel()) { - return; + // if isConcurrencyConflict is true, only notify serial gateway sender + if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict()) + && sender.isParallel()) { + continue; } if (logger.isDebugEnabled()) { logger.debug("Notifying the GatewaySender : {}", sender.getId()); @@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) { notifyBridgeClients(event); } + notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); return true; // event was elided } catch (DiskAccessException dae) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index eed6176..b68859e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -25,6 +25,7 @@ import org.apache.geode.internal.cache.entries.AbstractRegionEntry; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; +import org.apache.geode.internal.logging.LogService; /** * @@ -71,6 +72,10 @@ public class LocalRegionDataView implements InternalDataView { } catch (ConcurrentCacheModificationException e) { // a newer event has already been applied to the cache. this can happen // in a client cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event); } } @@ -81,6 +86,10 @@ public class LocalRegionDataView implements InternalDataView { } catch (ConcurrentCacheModificationException e) { // a later in time event has already been applied to the cache. this can happen // in a cache if another thread is operating on the same key + event.isConcurrencyConflict(true); + LocalRegion lr = event.getLocalRegion(); + LogService.getLogger().info("GGG:updateEntryVersion:" + event, new Exception()); + // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 7a2cee1..a557875 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -518,16 +518,21 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // version is < 7.0.1, especially to prevent another loop over events. if (!sendUpdateVersionEvents && event.getOperation() == Operation.UPDATE_VERSION_STAMP) { - if (isTraceEnabled) { - logger.trace( - "Update Event Version event: {} removed from Gateway Sender queue: {}", event, - sender); - } + logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}", + event, sender); itr.remove(); statistics.incEventsNotQueued(); continue; } + if (((GatewaySenderEventImpl) event).isConcurrencyConflict) { + if (isDebugEnabled) { + logger.debug("primary should ignore the concurrency conflict event:" + event); + } + itr.remove(); + statistics.incEventsNotQueued(); + continue; + } boolean transmit = filter.beforeTransmit(event); if (!transmit) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 4d201b2..d28dc5b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -175,6 +175,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + public boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -316,6 +318,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 734b560..7ecb233 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -379,6 +379,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (m != null) { for (EventWrapper ew : m.values()) { GatewaySenderEventImpl gatewayEvent = ew.event; + logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent); gatewayEvent.release(); } this.unprocessedEvents = null; @@ -711,8 +712,13 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven // @todo add an assertion that !getPrimary() // now we can safely use the unprocessedEvents field Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId()); + if (v != null && gatewayEvent.isConcurrencyConflict) { + logger.info("GGG:secondary after removed:" + v + ":" + gatewayEvent); + } - if (v == null) { + if (v == null && !gatewayEvent.isConcurrencyConflict) { + // only when isConcurrencyConflict is false, add the event into unprocessedEvents + logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception()); // first time for the event if (logger.isTraceEnabled()) { logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map", -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
