This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 30185338b67b3954eac1c2ca46021a86efb71b89
Author: zhouxh <[email protected]>
AuthorDate: Thu Nov 9 23:49:29 2017 -0800

    GEODE-3967: if put hits concurrent modification exception should still 
notify serial gateway sender
---
 .../apache/geode/internal/cache/AbstractRegionMap.java   |  3 +++
 .../apache/geode/internal/cache/DestroyOperation.java    |  3 ---
 .../geode/internal/cache/DistributedCacheOperation.java  | 15 ++++++++++++++-
 .../org/apache/geode/internal/cache/LocalRegion.java     | 16 +++++++++++-----
 .../apache/geode/internal/cache/LocalRegionDataView.java |  9 +++++++++
 .../cache/wan/AbstractGatewaySenderEventProcessor.java   | 15 ++++++++++-----
 .../geode/internal/cache/wan/GatewaySenderEventImpl.java |  3 +++
 .../wan/serial/SerialGatewaySenderEventProcessor.java    |  8 +++++++-
 8 files changed, 57 insertions(+), 15 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index da0cf59..ee0a4aa 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1188,6 +1188,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                               true/* conflict with clear */, duringRI, true);
                           doPart3 = true;
                         } catch (ConcurrentCacheModificationException ccme) {
+                          event.isConcurrencyConflict(true);
                           VersionTag tag = event.getVersionTag();
                           if (tag != null && tag.isTimeStampUpdated()) {
                             // Notify gateways of new time-stamp.
@@ -2097,6 +2098,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                   }
                 } // !opCompleted
               } catch (ConcurrentCacheModificationException ccme) {
+                event.isConcurrencyConflict(true);
                 VersionTag tag = event.getVersionTag();
                 if (tag != null && tag.isTimeStampUpdated()) {
                   // Notify gateways of new time-stamp.
@@ -2854,6 +2856,7 @@ public abstract class AbstractRegionMap implements 
RegionMap {
                     clearOccured = true;
                     owner.recordEvent(event);
                   } catch (ConcurrentCacheModificationException ccme) {
+                    event.isConcurrencyConflict(true);
                     VersionTag tag = event.getVersionTag();
                     if (tag != null && tag.isTimeStampUpdated()) {
                       // Notify gateways of new time-stamp.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 20cbd28..4d376c6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -95,9 +95,6 @@ public class DestroyOperation extends 
DistributedCacheOperation {
 
       } catch (EntryNotFoundException e) {
         dispatchElidedEvent(rgn, ev);
-        if (!ev.isConcurrencyConflict()) {
-          rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev);
-        }
         throw e;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 505c618..5ab3a4a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1290,11 +1290,24 @@ public abstract class DistributedCacheOperation {
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
-        logger.debug("dispatching elided event: {}", ev);
+        logger.debug("GGG:dispatching elided event: {}", ev, new Exception());
       }
       ev.isConcurrencyConflict(true);
       rgn.generateLocalFilterRouting(ev);
       rgn.notifyBridgeClients(ev);
+      rgn.notifyGatewaySender(getOperation(ev), ev);
+    }
+
+    private EnumListenerEvent getOperation(EntryEventImpl ev) {
+      if (ev.getOperation().isInvalidate()) {
+        return EnumListenerEvent.AFTER_INVALIDATE;
+      } else if (ev.getOperation().isDestroy()) {
+        return EnumListenerEvent.AFTER_DESTROY;
+      } else if (ev.getOperation().isUpdate()) {
+        return EnumListenerEvent.AFTER_UPDATE;
+      } else {
+        return EnumListenerEvent.AFTER_CREATE;
+      }
     }
 
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bed336a..158ff68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
               logger.debug("caught concurrent modification attempt when 
applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
         logger.debug("caught concurrent modification attempt when applying 
{}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? 
EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
+
       return false;
     }
 
@@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, 
EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification 
problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway 
sender
+          if ((!this.getDataPolicy().withStorage() || 
event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion 
implements InternalRegion, Loade
       if (event.getVersionTag() != null && 
!event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
       }
+      notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       return true; // event was elided
 
     } catch (DiskAccessException dae) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index eed6176..b68859e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -25,6 +25,7 @@ import 
org.apache.geode.internal.cache.entries.AbstractRegionEntry;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import 
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.logging.LogService;
 
 /**
  *
@@ -71,6 +72,10 @@ public class LocalRegionDataView implements InternalDataView 
{
     } catch (ConcurrentCacheModificationException e) {
       // a newer event has already been applied to the cache. this can happen
       // in a client cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new 
Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event);
     }
   }
 
@@ -81,6 +86,10 @@ public class LocalRegionDataView implements InternalDataView 
{
     } catch (ConcurrentCacheModificationException e) {
       // a later in time event has already been applied to the cache. this can 
happen
       // in a cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:updateEntryVersion:" + event, new 
Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..a557875 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -518,16 +518,21 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
               // version is < 7.0.1, especially to prevent another loop over 
events.
               if (!sendUpdateVersionEvents
                   && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-                if (isTraceEnabled) {
-                  logger.trace(
-                      "Update Event Version event: {} removed from Gateway 
Sender queue: {}", event,
-                      sender);
-                }
+                logger.debug("Update Event Version event: {} removed from 
Gateway Sender queue: {}",
+                    event, sender);
 
                 itr.remove();
                 statistics.incEventsNotQueued();
                 continue;
               }
+              if (((GatewaySenderEventImpl) event).isConcurrencyConflict) {
+                if (isDebugEnabled) {
+                  logger.debug("primary should ignore the concurrency conflict 
event:" + event);
+                }
+                itr.remove();
+                statistics.incEventsNotQueued();
+                continue;
+              }
 
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 4d201b2..d28dc5b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -175,6 +175,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -316,6 +318,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..7ecb233 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -379,6 +379,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       if (m != null) {
         for (EventWrapper ew : m.values()) {
           GatewaySenderEventImpl gatewayEvent = ew.event;
+          logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent);
           gatewayEvent.release();
         }
         this.unprocessedEvents = null;
@@ -711,8 +712,13 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       // @todo add an assertion that !getPrimary()
       // now we can safely use the unprocessedEvents field
       Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
+      if (v != null && gatewayEvent.isConcurrencyConflict) {
+        logger.info("GGG:secondary after removed:" + v + ":" + gatewayEvent);
+      }
 
-      if (v == null) {
+      if (v == null && !gatewayEvent.isConcurrencyConflict) {
+        // only when isConcurrencyConflict is false, add the event into 
unprocessedEvents
+        logger.info("GGG:secondary before add to:" + gatewayEvent, new 
Exception());
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromSecondary event {}:{}->{} added from 
unprocessed events map",

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to