This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5c37893  GEODE-5087: send a BatchDestroyOperation for each dropped 
event at serial primary sender (#1924)
5c37893 is described below

commit 5c378931c672d695f168d2aca0848664cb4c2f2f
Author: Xiaojian Zhou <gesterz...@users.noreply.github.com>
AuthorDate: Fri May 11 18:04:51 2018 -0700

    GEODE-5087: send a BatchDestroyOperation for each dropped event at serial 
primary sender (#1924)
---
 .../internal/ParallelAsyncEventQueueImpl.java      |  2 +-
 .../internal/SerialAsyncEventQueueImpl.java        |  2 +-
 .../internal/cache/wan/AbstractGatewaySender.java  | 68 +++++++++++++---------
 .../wan/AbstractGatewaySenderEventProcessor.java   | 33 +----------
 ...currentParallelGatewaySenderEventProcessor.java | 32 ++++++++++
 .../ParallelGatewaySenderEventProcessor.java       |  6 ++
 .../cache/wan/serial/BatchDestroyOperation.java    | 28 ++++++++-
 ...oncurrentSerialGatewaySenderEventProcessor.java | 64 ++++++++++++--------
 .../serial/SerialGatewaySenderEventProcessor.java  | 47 +++++++++++++--
 .../xmlcache/ParallelAsyncEventQueueCreation.java  |  2 +-
 .../xmlcache/ParallelGatewaySenderCreation.java    |  2 +-
 .../xmlcache/SerialAsyncEventQueueCreation.java    |  2 +-
 .../xmlcache/SerialGatewaySenderCreation.java      |  2 +-
 .../wan/parallel/ParallelGatewaySenderImpl.java    |  2 +-
 .../cache/wan/serial/SerialGatewaySenderImpl.java  |  6 +-
 .../SerialGatewaySenderOperationsDUnitTest.java    |  2 -
 16 files changed, 199 insertions(+), 101 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 538b65a..8e2e4e4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -168,7 +168,7 @@ public class ParallelAsyncEventQueueImpl extends 
AbstractGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 9e0239d..400126d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -225,7 +225,7 @@ public class SerialAsyncEventQueueImpl extends 
AbstractGatewaySender {
    * internal.cache.EntryEventImpl)
    */
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 123534a..149fa48 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -173,6 +173,9 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 
   protected volatile ConcurrentLinkedQueue<TmpQueueEvent> tmpQueuedEvents =
       new ConcurrentLinkedQueue<>();
+
+  protected volatile ConcurrentLinkedQueue<EntryEventImpl> tmpDroppedEvents =
+      new ConcurrentLinkedQueue<>();
   /**
    * The number of seconds to wait before stopping the GatewaySender. Default 
is 0 seconds.
    */
@@ -836,40 +839,43 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
-    // If this gateway is not running, return
-    if (!isRunning()) {
-      if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender 
queue:" + event);
-      }
-      if (this.eventProcessor != null) {
-        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
-      }
-      return;
-    }
-
-    final GatewaySenderStats stats = getStatistics();
-    stats.incEventsReceived();
-
-    if (!checkForDistribution(event, stats)) {
-      stats.incEventsNotQueued();
-      return;
-    }
-
-    // this filter is defined by Asif which exist in old wan too. new wan has
-    // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah 
is
-    // not considering this filter
-    if (!this.filter.enqueueEvent(event)) {
-      stats.incEventsFiltered();
-      return;
-    }
     // released by this method or transfers ownership to TmpQueueEvent
     @Released
     EntryEventImpl clonedEvent = new EntryEventImpl(event, false);
     boolean freeClonedEvent = true;
     try {
 
-      Region region = event.getRegion();
+      // If this gateway is not running, return
+      if (!isRunning()) {
+        if (this.isPrimary()) {
+          tmpDroppedEvents.add(clonedEvent);
+          if (isDebugEnabled) {
+            logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent);
+          }
+        }
+        if (isDebugEnabled) {
+          logger.debug("Returning back without putting into the gateway sender 
queue:" + event);
+        }
+        return;
+      }
+
+      final GatewaySenderStats stats = getStatistics();
+      stats.incEventsReceived();
+
+      if (!checkForDistribution(event, stats)) {
+        stats.incEventsNotQueued();
+        return;
+      }
 
+      // this filter is defined by Asif which exist in old wan too. new wan has
+      // other GatewaEventFilter. Do we need to get rid of this filter. 
Cheetah is
+      // not considering this filter
+      if (!this.filter.enqueueEvent(event)) {
+        stats.incEventsFiltered();
+        return;
+      }
+
+      // start to distribute
       setModifiedEventId(clonedEvent);
       Object callbackArg = clonedEvent.getRawCallbackArgument();
 
@@ -1016,6 +1022,12 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
    */
   public void enqueueTempEvents() {
     if (this.eventProcessor != null) {// Fix for defect #47308
+      // process tmpDroppedEvents
+      EntryEventImpl droppedEvent = null;
+      while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
+      }
+
       TmpQueueEvent nextEvent = null;
       final GatewaySenderStats stats = getStatistics();
       try {
@@ -1216,7 +1228,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return region;
   }
 
-  protected abstract void setModifiedEventId(EntryEventImpl clonedEvent);
+  public abstract void setModifiedEventId(EntryEventImpl clonedEvent);
 
   public static class DefaultGatewayEventFilter
       implements org.apache.geode.internal.cache.GatewayEventFilter {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 2ce06c6..89fa586 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -279,36 +277,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
     return this.queue.size();
   }
 
-  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
-    if (queue == null) {
-      return;
-    }
-    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
-      ConcurrentParallelGatewaySenderQueue cpgsq = 
(ConcurrentParallelGatewaySenderQueue) queue;
-      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
-      if (prQ == null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("shadow partitioned region " + 
event.getRegion().getFullPath()
-              + " is not created yet.");
-        }
-        return;
-      }
-      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) 
event);
-      long shadowKey = event.getTailKey();
-
-      ParallelGatewaySenderQueue pgsq =
-          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
-      boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-      if (isPrimary) {
-        pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
-        
this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
-        if (logger.isDebugEnabled()) {
-          logger.debug("register dropped event for primary queue. BucketId is 
" + bucketId
-              + ", shadowKey is " + shadowKey + ", prQ is " + 
prQ.getFullPath());
-        }
-      }
-    }
-  }
+  protected abstract void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent);
 
   /**
    * @return the sender
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 54b7034..6b8cce1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -32,12 +32,15 @@ import org.apache.geode.GemFireException;
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -138,6 +141,35 @@ public class ConcurrentParallelGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent) {
+    if (queue == null) {
+      return;
+    }
+    ConcurrentParallelGatewaySenderQueue cpgsq = 
(ConcurrentParallelGatewaySenderQueue) queue;
+    PartitionedRegion prQ = 
cpgsq.getRegion(droppedEvent.getRegion().getFullPath());
+    if (prQ == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("shadow partitioned region " + 
droppedEvent.getRegion().getFullPath()
+            + " is not created yet.");
+      }
+      return;
+    }
+    int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) 
droppedEvent);
+    long shadowKey = droppedEvent.getTailKey();
+
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) 
cpgsq.getQueueByBucket(bucketId);
+    boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+    if (isPrimary) {
+      pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+      
this.sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
+      if (logger.isDebugEnabled()) {
+        logger.debug("register dropped event for primary queue. BucketId is " 
+ bucketId
+            + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+      }
+    }
+  }
+
+  @Override
   public void run() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 5715a35..77811c8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -149,6 +149,12 @@ public class ParallelGatewaySenderEventProcessor extends 
AbstractGatewaySenderEv
     }
   }
 
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent) {
+    logger.info("ParallelGatewaySenderEventProcessor should not process 
dropped event {}",
+        droppedEvent);
+  }
+
   public void clear(PartitionedRegion pr, int bucketId) {
     ((ParallelGatewaySenderQueue) this.queue).clear(pr, bucketId);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
index debb005..d9dde9d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/BatchDestroyOperation.java
@@ -102,7 +102,7 @@ public class BatchDestroyOperation extends 
DistributedCacheOperation {
         }
 
         // Optimized way
-        for (long k = (Long) this.key; k <= this.tailKey; k++) {
+        for (long k = (Long) this.key; k <= this.tailKey && this.tailKey != 
-1; k++) {
           try {
             for (GatewayEventFilter filter : rgn.getSerialGatewaySender()
                 .getGatewayEventFilters()) {
@@ -124,6 +124,32 @@ public class BatchDestroyOperation extends 
DistributedCacheOperation {
             }
           }
         }
+
+        // destroy dropped event from unprocessedKeys
+        if (this.tailKey == -1) {
+          SerialGatewaySenderEventProcessor ep = null;
+          int index = ((Long) this.key).intValue();
+          if (index == -1) {
+            // this is SerialGatewaySenderEventProcessor
+            ep = (SerialGatewaySenderEventProcessor) 
rgn.getSerialGatewaySender()
+                .getEventProcessor();
+          } else {
+            ConcurrentSerialGatewaySenderEventProcessor csgep =
+                (ConcurrentSerialGatewaySenderEventProcessor) 
rgn.getSerialGatewaySender()
+                    .getEventProcessor();
+            ep = csgep.processors.get(index);
+          }
+          if (ep != null) {
+            // if sender is being shutdown, the ep could be null
+            boolean removed = ep.basicHandlePrimaryDestroy(ev.getEventId());
+            if (removed) {
+              if (isDebugEnabled) {
+                logger.debug("Removed a dropped event {} from 
unprocessedEvents.",
+                    (EntryEventImpl) event);
+              }
+            }
+          }
+        }
         this.appliedOperation = true;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ec01fd9..8ec6ce1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -109,6 +109,35 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   }
 
+  public void setModifiedEventId(EntryEventImpl clonedEvent, int index) {
+    EventID originalEventId = clonedEvent.getEventId();
+    if (logger.isDebugEnabled()) {
+      logger.debug("The original EventId is {}", originalEventId);
+    }
+    // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
+    // generating threadId by the algorithm explained above used to clash with
+    // fakeThreadId generated by putAll
+    // below is new way to generate threadId so that it doesn't clash with
+    // any.
+    long newThreadId =
+        ThreadIdentifier.createFakeThreadIDForParallelGateway(index, 
originalEventId.getThreadID(),
+            0 /*
+               * gateway sender event id index has already been applied in
+               * SerialGatewaySenderImpl.setModifiedEventId
+               */);
+    EventID newEventId = new EventID(originalEventId.getMembershipID(), 
newThreadId,
+        originalEventId.getSequenceID());
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: Generated event id for event with key={}, index={}, original 
event id={}, threadId={}, new event id={}, newThreadId={}"
+              + ":index=" + this.sender.getEventIdIndex(),
+          this, clonedEvent.getKey(), index, originalEventId,
+          ThreadIdentifier.toDisplayString(originalEventId.getThreadID()), 
newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
+    }
+    clonedEvent.setEventId(newEventId);
+  }
+
   public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
       int index) throws CacheException, IOException {
     // Get the appropriate gateway
@@ -121,30 +150,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       @Released
       EntryEventImpl clonedEvent = new EntryEventImpl((EntryEventImpl) event);
       try {
-        EventID originalEventId = clonedEvent.getEventId();
-        if (logger.isDebugEnabled()) {
-          logger.debug("The original EventId is {}", originalEventId);
-        }
-        // PARALLEL_THREAD_BUFFER * (index +1) + originalEventId.getThreadID();
-        // generating threadId by the algorithm explained above used to clash 
with
-        // fakeThreadId generated by putAll
-        // below is new way to generate threadId so that it doesn't clash with
-        // any.
-        long newThreadId = 
ThreadIdentifier.createFakeThreadIDForParallelGateway(index,
-            originalEventId.getThreadID(),
-            0 /*
-               * gateway sender event id index has already been applied in
-               * SerialGatewaySenderImpl.setModifiedEventId
-               */);
-        EventID newEventId = new EventID(originalEventId.getMembershipID(), 
newThreadId,
-            originalEventId.getSequenceID());
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Generated event id for event with key={}, index={}, 
original event id={}, threadId={}, new event id={}, newThreadId={}",
-              this, event.getKey(), index, originalEventId, 
originalEventId.getThreadID(),
-              newEventId, newThreadId);
-        }
-        clonedEvent.setEventId(newEventId);
+        setModifiedEventId(clonedEvent, index);
         serialProcessor.enqueueEvent(operation, clonedEvent, substituteValue);
       } finally {
         clonedEvent.release();
@@ -375,6 +381,16 @@ public class ConcurrentSerialGatewaySenderEventProcessor
   }
 
   @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    // modified event again for concurrent SGSEP
+    int index = Math.abs(getHashCode(((EntryEventImpl) droppedEvent)) % 
this.processors.size());
+    setModifiedEventId(droppedEvent, index);
+
+    
this.processors.get(index).sendBatchDestroyOperationForDroppedEvent(droppedEvent,
 index);
+  }
+
+  @Override
   protected void enqueueEvent(GatewayQueueEvent event) {
     for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
       serialProcessor.enqueueEvent(event);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 3fa4d6a..39609c7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
@@ -610,7 +611,7 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
       }
       my_executor.execute(new Runnable() {
         public void run() {
-          basicHandlePrimaryDestroy(gatewayEvent);
+          basicHandlePrimaryDestroy(gatewayEvent.getEventId());
         }
       });
     }
@@ -620,23 +621,25 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
    * Just remove the event from the unprocessed events map if it is present. 
This method added to
    * fix bug 37603
    */
-  protected void basicHandlePrimaryDestroy(final GatewaySenderEventImpl 
gatewayEvent) {
+  protected boolean basicHandlePrimaryDestroy(final EventID eventId) {
     if (this.sender.isPrimary()) {
       // no need to do anything if we have become the primary
-      return;
+      return false;
     }
     GatewaySenderStats statistics = this.sender.getStatistics();
     // Get the event from the map
     synchronized (unprocessedEventsLock) {
       if (this.unprocessedEvents == null)
-        return;
+        return false;
       // now we can safely use the unprocessedEvents field
-      EventWrapper ew = 
this.unprocessedEvents.remove(gatewayEvent.getEventId());
+      EventWrapper ew = this.unprocessedEvents.remove(eventId);
       if (ew != null) {
         ew.event.release();
         statistics.incUnprocessedEventsRemovedByPrimary();
+        return true;
       }
     }
+    return false;
   }
 
   protected void basicHandlePrimaryEvent(final GatewaySenderEventImpl 
gatewayEvent) {
@@ -865,4 +868,38 @@ public class SerialGatewaySenderEventProcessor extends 
AbstractGatewaySenderEven
     // @TODO This API hasn't been implemented yet
     throw new UnsupportedOperationException();
   }
+
+  public void sendBatchDestroyOperationForDroppedEvent(EntryEventImpl 
dropEvent, int index) {
+    EntryEventImpl destroyEvent =
+        EntryEventImpl.create((LocalRegion) this.queue.getRegion(), 
Operation.DESTROY, (long) index,
+            null/* newValue */, null, false, sender.getCache().getMyId());
+    destroyEvent.setEventId(dropEvent.getEventId());
+    destroyEvent.disallowOffHeapValues();
+    destroyEvent.setTailKey(-1L);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "SerialGatewaySenderEventProcessor sends BatchDestroyOperation to 
secondary for event {}",
+          destroyEvent);
+    }
+
+    try {
+      BatchDestroyOperation op = new BatchDestroyOperation(destroyEvent);
+      op.distribute();
+      if (logger.isDebugEnabled()) {
+        logger.debug("BatchRemovalThread completed destroy of dropped event 
{}", dropEvent);
+      }
+    } catch (Exception ignore) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Exception in sending dropped event could be ignored in order not 
to interrupt sender starting",
+            ignore);
+      }
+    }
+  }
+
+  @Override
+  protected void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent) {
+    this.getSender().setModifiedEventId(droppedEvent);
+    sendBatchDestroyOperationForDroppedEvent(droppedEvent, -1);
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
index 6f8efa8..4686b67 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelAsyncEventQueueCreation.java
@@ -87,5 +87,5 @@ public class ParallelAsyncEventQueueCreation extends 
AbstractGatewaySender
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
index 5b025b5..257ee75 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/ParallelGatewaySenderCreation.java
@@ -90,7 +90,7 @@ public class ParallelGatewaySenderCreation extends 
AbstractGatewaySender impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long 
timestamp) {
     throw new UnsupportedOperationException();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
index ce71c54..cd06661 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialAsyncEventQueueCreation.java
@@ -86,5 +86,5 @@ public class SerialAsyncEventQueueCreation extends 
AbstractGatewaySender impleme
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
index 80c04de..b0766ff 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/SerialGatewaySenderCreation.java
@@ -87,7 +87,7 @@ public class SerialGatewaySenderCreation extends 
AbstractGatewaySender implement
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {}
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {}
 
   protected GatewayQueueEvent getSynchronizationEvent(Object key, long 
timestamp) {
     throw new UnsupportedOperationException();
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index d023704..f565426 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -167,7 +167,7 @@ public class ParallelGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     int bucketId = -1;
     // merged from 42004
     if (clonedEvent.getRegion() instanceof DistributedRegion) {
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d964253..ecca896 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -211,7 +211,7 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
   }
 
   @Override
-  protected void setModifiedEventId(EntryEventImpl clonedEvent) {
+  public void setModifiedEventId(EntryEventImpl clonedEvent) {
     EventID originalEventId = clonedEvent.getEventId();
     long originalThreadId = originalEventId.getThreadID();
     long newThreadId = originalThreadId;
@@ -226,7 +226,9 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Generated event id for event with key={}, original event id={}, 
originalThreadId={}, new event id={}, newThreadId={}",
-          this, clonedEvent.getKey(), originalEventId, originalThreadId, 
newEventId, newThreadId);
+          this, clonedEvent.getKey(), originalEventId,
+          ThreadIdentifier.toDisplayString(originalThreadId), newEventId,
+          ThreadIdentifier.toDisplayString(newThreadId));
     }
     clonedEvent.setEventId(newEventId);
   }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index caa357e..4993f24 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -50,7 +50,6 @@ import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({DistributedTest.class, WanTest.class})
@@ -266,7 +265,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends 
WANTestBase {
     vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
   }
 
-  @Category({FlakyTest.class, WanTest.class}) // GEODE-5056
   @Test
   public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
     Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));

-- 
To stop receiving notification emails like this one, please contact
zho...@apache.org.

Reply via email to