This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8a7b1fc  GEODE-8465: secondary HARegionQueue to sync with primary 
queue (#5496)
8a7b1fc is described below

commit 8a7b1fc7bc91d772d51282ea527c7e58711569c1
Author: Eric Shu <[email protected]>
AuthorDate: Wed Sep 9 11:39:44 2020 -0700

    GEODE-8465: secondary HARegionQueue to sync with primary queue (#5496)
    
      A seconary HARegionQueue will try to sync with primary after GII
      to remove any events have been already dispatched from primary.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +
 .../org/apache/geode/internal/DSFIDFactory.java    |   5 +
 .../geode/internal/cache/ha/HARegionQueue.java     | 219 +++++++++++-
 .../internal/cache/ha/QueueRemovalMessage.java     |   4 +
 .../cache/ha/QueueSynchronizationProcessor.java    | 252 +++++++++++++
 .../geode/internal/cache/ha/HARegionQueueTest.java | 391 ++++++++++++++++++++-
 .../internal/cache/ha/QueueRemovalMessageTest.java |  21 ++
 .../ha/QueueSynchronizationProcessorTest.java      | 142 ++++++++
 .../serialization/DataSerializableFixedID.java     |   5 +-
 9 files changed, 1029 insertions(+), 18 deletions(-)

diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 733737e..cbc995d 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1308,6 +1308,14 @@ org/apache/geode/internal/cache/ha/QueueRemovalMessage,2
 fromData,119
 toData,125
 
+org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor$QueueSynchronizationMessage,2
+fromData,77
+toData,86
+
+org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor$QueueSynchronizationReplyMessage,2
+fromData,76
+toData,80
+
 org/apache/geode/internal/cache/ha/ThreadIdentifier,2
 fromData,19
 toData,19
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java 
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index bd613db..504e7d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -274,6 +274,7 @@ import 
org.apache.geode.internal.cache.control.SerializableRegionRedundancyStatu
 import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
 import 
org.apache.geode.internal.cache.ha.HARegionQueue.DispatchedAndCurrentEvents;
 import org.apache.geode.internal.cache.ha.QueueRemovalMessage;
+import org.apache.geode.internal.cache.ha.QueueSynchronizationProcessor;
 import org.apache.geode.internal.cache.locks.TXLockBatch;
 import org.apache.geode.internal.cache.locks.TXLockIdImpl;
 import org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage;
@@ -581,6 +582,10 @@ public class DSFIDFactory implements 
DataSerializableFixedID {
     serializer.registerDSFID(ADD_CACHESERVER_PROFILE_UPDATE, 
AddCacheServerProfileMessage.class);
     serializer.registerDSFID(REMOVE_CACHESERVER_PROFILE_UPDATE,
         RemoveCacheServerProfileMessage.class);
+    serializer.registerDSFID(QUEUE_SYNCHRONIZATION_MESSAGE,
+        QueueSynchronizationProcessor.QueueSynchronizationMessage.class);
+    serializer.registerDSFID(QUEUE_SYNCHRONIZATION_REPLY_MESSAGE,
+        QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
     serializer.registerDSFID(SERVER_INTEREST_REGISTRATION_MESSAGE,
         ServerInterestRegistrationMessage.class);
     serializer.registerDSFID(FILTER_PROFILE_UPDATE, 
FilterProfile.OperationMessage.class);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index c2f4a42..e2c7dc5 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -42,8 +43,11 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 
@@ -80,6 +84,7 @@ import 
org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.Conflatable;
@@ -204,7 +209,7 @@ public class HARegionQueue implements RegionQueue {
    * via GII , then there should not be any violation. If there is data 
arriving thru GII, such
    * violations can be expected , but should be analyzed thoroughly.
    */
-  protected boolean puttingGIIDataInQueue;
+  protected final AtomicBoolean puttingGIIDataInQueue = new AtomicBoolean();
 
   /**
    * flag indicating whether interest has been registered for this queue. This 
is used to prevent
@@ -321,6 +326,13 @@ public class HARegionQueue implements RegionQueue {
    */
   protected long maxQueueSizeHitCount = 0;
 
+  final AtomicBoolean hasSynchronizedWithPrimary = new AtomicBoolean();
+  final AtomicBoolean scheduleSynchronizationWithPrimaryInProgress = new 
AtomicBoolean();
+  final AtomicBoolean doneGIIQueueing = new AtomicBoolean();
+  volatile long doneGIIQueueingTime;
+  volatile long positionBeforeGII = 0;
+  volatile long positionAfterGII = 0;
+
   /**
    * Processes the given string and returns a string which is allowed for 
region names
    *
@@ -488,7 +500,7 @@ public class HARegionQueue implements RegionQueue {
     // data, then the relevant data structures have to
     // be populated
     if (!entrySet.isEmpty()) {
-      this.puttingGIIDataInQueue = true;
+      puttingGIIDataInQueue.set(true);
       final boolean isDebugEnabled = logger.isDebugEnabled();
       try {
         Region.Entry entry = null;
@@ -533,7 +545,13 @@ public class HARegionQueue implements RegionQueue {
         }
         this.tailKey.set(max);
       } finally {
-        this.puttingGIIDataInQueue = false;
+        puttingGIIDataInQueue.set(false);
+        positionAfterGII = tailKey.get();
+        if (positionAfterGII > 0) {
+          positionBeforeGII = 1;
+        }
+        doneGIIQueueing.set(true);
+        doneGIIQueueingTime = System.currentTimeMillis();
         if (isDebugEnabled) {
           logger.debug("{} done putting GII data into queue", this);
         }
@@ -671,14 +689,14 @@ public class HARegionQueue implements RegionQueue {
     long sequenceID = eventId.getSequenceID();
     // Check from Events Map if the put operation should proceed or not
     DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents) 
this.eventsMap.get(ti);
-    if (dace != null && dace.isGIIDace && this.puttingGIIDataInQueue) {
+    if (dace != null && dace.isGIIDace && puttingGIIDataInQueue.get()) {
       // we only need to retain DACE for which there are no entries in the 
queue.
       // for other thread identifiers we build up a new DACE
       dace = null;
     }
     if (dace != null) {
       // check the last dispatched sequence Id
-      if (this.puttingGIIDataInQueue || (sequenceID > 
dace.lastDispatchedSequenceId)) {
+      if (puttingGIIDataInQueue.get() || (sequenceID > 
dace.lastDispatchedSequenceId)) {
         // Asif:Insert the Event into the Region with proper locking.
         // It is possible that by the time put operation proceeds , the
         // Last dispatched id has changed so it is possible that the object at
@@ -686,7 +704,7 @@ public class HARegionQueue implements RegionQueue {
         // also does not get added
         if (!dace.putObject(event, sequenceID)) {
           // dace encountered a DESTROYED token - stop adding GII data
-          if (!this.puttingGIIDataInQueue) {
+          if (!puttingGIIDataInQueue.get()) {
             this.put(object);
           }
         } else {
@@ -1689,8 +1707,7 @@ public class HARegionQueue implements RegionQueue {
 
 
   /**
-   * Used for testing purposes only. Returns the set of current counters for 
the given
-   * ThreadIdentifier
+   * Returns the set of current counters for the given ThreadIdentifier
    *
    * @param id - the EventID object
    * @return - the current counters set
@@ -2962,7 +2979,7 @@ public class HARegionQueue implements RegionQueue {
       Long oldPosition = null;
       final boolean isDebugEnabled_BS = 
logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE);
       if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID
-          && !owningQueue.puttingGIIDataInQueue) {
+          && !owningQueue.puttingGIIDataInQueue.get()) {
         logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
             "HARegionQueue::DACE:putObject: Given sequence ID is already 
present ({}).\nThis may be a recovered operation via P2P or a 
GetInitialImage.\nlastSequenceIDPut = {} ; event = {};\n",
             sequenceID, lastSequenceIDPut, event);
@@ -2977,7 +2994,7 @@ public class HARegionQueue implements RegionQueue {
             logger.trace("HARegionQueue.putObject: adding {}", event);
           }
           this.lastSequenceIDPut = sequenceID;
-        } else if (!owningQueue.puttingGIIDataInQueue) {
+        } else if (!owningQueue.puttingGIIDataInQueue.get()) {
           if (isDebugEnabled_BS) {
             logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
                 "{} eliding event with ID {}, because it is not greater than 
the last sequence ID ({}). The rejected event has key <{}> and value <{}>",
@@ -2999,7 +3016,7 @@ public class HARegionQueue implements RegionQueue {
           return false;
         }
 
-        if (sequenceID > lastDispatchedSequenceId || 
owningQueue.puttingGIIDataInQueue) {
+        if (sequenceID > lastDispatchedSequenceId || 
owningQueue.puttingGIIDataInQueue.get()) {
           // Insert the object into the Region
           Long position = owningQueue.tailKey.incrementAndGet();
 
@@ -3720,7 +3737,7 @@ public class HARegionQueue implements RegionQueue {
           if (logger.isDebugEnabled()) {
             logger.debug(caller + " decremented Event ID hash code: " + 
haContainerKey.hashCode()
                 + "; System ID hash code: " + 
System.identityHashCode(haContainerKey)
-                + "; Wrapper details: " + haContainerKey);
+                + "; Wrapper details: " + haContainerKey + "; for queue: " + 
regionName);
           }
           if (haContainerKey.decAndGetReferenceCount() == 0L) {
             HARegionQueue.this.haContainer.remove(haContainerKey);
@@ -3943,4 +3960,182 @@ public class HARegionQueue implements RegionQueue {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedEvents(List<EventID> eventIds) {
+    List<EventID> dispatchedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isDispatched(eventId)) {
+        dispatchedEvents.add(eventId);
+      }
+    }
+    return dispatchedEvents;
+  }
+
+  boolean isDispatched(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public synchronized void 
synchronizeQueueWithPrimary(InternalDistributedMember primary,
+      InternalCache cache) {
+    if (scheduleSynchronizationWithPrimaryInProgress.get() || 
hasSynchronizedWithPrimary.get()
+        || !doneGIIQueueing.get()) {
+      // Order of the check is important here as timer scheduled thread
+      // setting hasSynchronizedWithPrimary to true first before setting
+      // scheduleSynchronizationWithPrimaryInProgress to false in 
doSynchronizationWithPrimary.
+      return;
+    }
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    if (!hasSynchronizedWithPrimary.get() && 
!scheduleSynchronizationWithPrimaryInProgress.get()) {
+      scheduleSynchronizationWithPrimaryInProgress.set(true);
+    }
+    long delay = getDelay();
+    scheduleSynchronizationWithPrimary(primary, cache, delay);
+  }
+
+  long getDelay() {
+    // Wait for in-flight events during gii to be distributed to the member 
with primary queue.
+    long waitTime = 15 * 1000;
+    long currentTime = getCurrentTime();
+    long elapsed = currentTime - doneGIIQueueingTime;
+    return elapsed < waitTime ? waitTime - elapsed : 0L;
+  }
+
+  long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
+
+  synchronized void 
scheduleSynchronizationWithPrimary(InternalDistributedMember primary,
+      InternalCache cache, long delay) {
+    cache.getCCPTimer().schedule(new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        doSynchronizationWithPrimary(primary, cache);
+      }
+    }, delay);
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    int maxChunkSize = 1000;
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("HARegionQueue {} has synced with primary on {}", 
regionName, primary);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      scheduleSynchronizationWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry<?, ?> entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = (HAEventWrapper) entry.getValue();
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "After sync with primary for HARegionQueue {}, removing dispatched 
event with ID {}",
+            regionName, id);
+      }
+      removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "HARegionQueue {} was found to be destroyed when attempting to 
remove dispatched event with ID {} after sync",
+          regionName, id);
+    } catch (CancelException ignore) {
+      return false;
+    } catch (CacheException e) {
+      logger.error(String.format(
+          "HARegionQueue %s encountered an exception when attempting to remove 
event with ID %s from the queue",
+          regionName, id), e);
+    } catch (InterruptedException ignore) {
+      Thread.currentThread().interrupt();
+      return false;
+    } catch (RejectedExecutionException ignore) {
+      interrupted = true;
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return true;
+  }
+
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index f789411..4c967be 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -98,6 +98,10 @@ public class QueueRemovalMessage extends 
PooledDistributionMessage {
       boolean succeed = processRegionQueue(iterator, regionName, size, hrq);
       if (!succeed) {
         return;
+      } else {
+        if (hrq != null) {
+          hrq.synchronizeQueueWithPrimary(getSender(), cache);
+        }
       }
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
new file mode 100644
index 0000000..52cb741
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Synchronize queue with primary holder
+ */
+public class QueueSynchronizationProcessor extends ReplyProcessor21 {
+  private static final Logger logger = LogService.getLogger();
+
+  QueueSynchronizationReplyMessage reply;
+
+  QueueSynchronizationProcessor(DistributionManager dm, 
InternalDistributedMember primary) {
+    super(dm, primary);
+  }
+
+  static List<EventID> getDispatchedEvents(final DistributionManager dm,
+      final InternalDistributedMember primary, String regionName, 
List<EventID> events) {
+    QueueSynchronizationProcessor processor = new 
QueueSynchronizationProcessor(dm, primary);
+
+    QueueSynchronizationMessage message = new QueueSynchronizationMessage();
+    message.setEventIdList(events);
+    message.setProcessorId(processor.getProcessorId());
+    message.setRegionName(regionName);
+
+    message.setRecipient(primary);
+    dm.putOutgoing(message);
+
+    try {
+      processor.waitForRepliesUninterruptibly();
+    } catch (ReplyException e) {
+      e.handleCause();
+    }
+
+    if (processor.reply != null) {
+      return processor.reply.getDispatchedEvents();
+    }
+    // Failed to get reply.
+    return null;
+  }
+
+  @Override
+  public void process(DistributionMessage msg) {
+    try {
+      reply = (QueueSynchronizationReplyMessage) msg;
+    } finally {
+      super.process(msg);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  // QueueSynchronizationMessage
+  // This message is sent to the the primary queue holder after receiving 
QueueRemovalMessage.
+  // It contains the list of eventIDs that has been GIIed but not yet removed.
+  // The primary queue holder will check and sent back a list of event ids 
that has been dispatched.
+  // -------------------------------------------------------------------------
+  public static class QueueSynchronizationMessage extends 
PooledDistributionMessage implements
+      MessageWithReply {
+    private List<EventID> eventIds;
+
+    private int processorId;
+    private String regionName;
+
+    public QueueSynchronizationMessage() {}
+
+    void setEventIdList(List<EventID> message) {
+      eventIds = message;
+    }
+
+    void setRegionName(String regionName) {
+      this.regionName = regionName;
+    }
+
+    void setProcessorId(int processorId) {
+      this.processorId = processorId;
+    }
+
+    @Override
+    protected void process(ClusterDistributionManager dm) {
+      final QueueSynchronizationReplyMessage replyMessage =
+          createQueueSynchronizationReplyMessage();
+      ReplyException replyException = null;
+
+      final InternalCache cache = dm.getCache();
+      try {
+        if (cache != null) {
+          List<EventID> dispatched = getDispatchedEvents(cache);
+          replyMessage.setEventIds(dispatched);
+          replyMessage.setSuccess();
+        }
+      } catch (RuntimeException | Error e) {
+        replyException = new ReplyException(e);
+        throw e;
+      } finally {
+        replyMessage.setProcessorId(processorId);
+        replyMessage.setRecipient(getSender());
+        if (replyException != null) {
+          replyMessage.setException(replyException);
+        }
+        dm.putOutgoing(replyMessage);
+      }
+    }
+
+    QueueSynchronizationReplyMessage createQueueSynchronizationReplyMessage() {
+      return new QueueSynchronizationReplyMessage();
+    }
+
+    List<EventID> getDispatchedEvents(InternalCache cache) {
+      LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+      if (region == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("processing QueueSynchronizationMessage region {} does 
not exist.",
+              regionName);
+        }
+        return null;
+      }
+      HARegionQueue haRegionQueue = ((HARegion) region).getOwner();
+      return haRegionQueue.getDispatchedEvents(eventIds);
+    }
+
+    @Override
+    public int getDSFID() {
+      return QUEUE_SYNCHRONIZATION_MESSAGE;
+    }
+
+    @Override
+    public int getProcessorId() {
+      return processorId;
+    }
+
+    @Override
+    public void toData(DataOutput out,
+        SerializationContext context) throws IOException {
+      super.toData(out, context);
+      DataSerializer.writeString(regionName, out);
+      DataSerializer.writeInteger(processorId, out);
+      int numberOfIds = eventIds.size();
+      DataSerializer.writeInteger(numberOfIds, out);
+      for (EventID eventId : eventIds) {
+        DataSerializer.writeObject(eventId, out);
+      }
+    }
+
+    @Override
+    public void fromData(DataInput in,
+        DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+      eventIds = new LinkedList<>();
+
+      regionName = DataSerializer.readString(in);
+      processorId = DataSerializer.readInteger(in);
+      int size = DataSerializer.readInteger(in);
+      for (int i = 0; i < size; i++) {
+        eventIds.add(uncheckedCast(DataSerializer.readObject(in)));
+      }
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  // QueueSynchronizationReplyMessage
+  // -------------------------------------------------------------------------
+  public static class QueueSynchronizationReplyMessage extends ReplyMessage {
+    private List<EventID> events;
+    private boolean succeed = false;
+
+    public QueueSynchronizationReplyMessage() {}
+
+    void setEventIds(List<EventID> events) {
+      this.events = events;
+    }
+
+    List<EventID> getDispatchedEvents() {
+      return events;
+    }
+
+    void setSuccess() {
+      succeed = true;
+    }
+
+    @Override
+    public int getDSFID() {
+      return QUEUE_SYNCHRONIZATION_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out, SerializationContext context) throws 
IOException {
+      super.toData(out, context);
+      DataSerializer.writeBoolean(succeed, out);
+      if (succeed) {
+        DataSerializer.writeInteger(events.size(), out);
+        for (EventID eventId : events) {
+          DataSerializer.writeObject(eventId, out);
+        }
+      }
+    }
+
+    @Override
+    public void fromData(DataInput in, DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+      succeed = DataSerializer.readBoolean(in);
+      if (succeed) {
+        events = new LinkedList<>();
+        int size = DataSerializer.readInteger(in);
+        for (int i = 0; i < size; i++) {
+          events.add(uncheckedCast(DataSerializer.readObject(in)));
+        }
+      }
+    }
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
index f490675..b671fd7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
@@ -15,10 +15,12 @@
 package org.apache.geode.internal.cache.ha;
 
 import static junit.framework.TestCase.assertEquals;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -27,7 +29,10 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.junit.Before;
@@ -35,25 +40,42 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.HARegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
 import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import 
org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
 @Category({ClientSubscriptionTest.class})
 public class HARegionQueueTest {
-
   private HARegionQueue haRegionQueue;
+  private final HARegion haRegion = mock(HARegion.class);
+  private final InternalCache internalCache = mock(InternalCache.class);
+  private final List<EventID> eventIDs = new LinkedList<>();
+  private final EventID id1 = mock(EventID.class);
+  private final EventID id2 = mock(EventID.class);
+  private final EventID id3 = mock(EventID.class);
+  private final EventID id4 = mock(EventID.class);
+  private final Collection<List<EventID>> chunks = new LinkedList<>();
+  private final List<EventID> chunk1 = new LinkedList<>();
+  private final List<EventID> chunk2 = new LinkedList<>();
+  private final InternalDistributedMember primary = 
mock(InternalDistributedMember.class);
+  private final HARegionQueue.DispatchedAndCurrentEvents wrapper =
+      new HARegionQueue.DispatchedAndCurrentEvents();
+  private final long delay = 1L;
 
   @Before
   public void setup() throws IOException, ClassNotFoundException, 
InterruptedException {
-    InternalCache internalCache = mock(InternalCache.class);
-
     StoppableReentrantReadWriteLock giiLock = 
mock(StoppableReentrantReadWriteLock.class);
     when(giiLock.readLock())
         
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
@@ -64,7 +86,6 @@ public class HARegionQueueTest {
     when(rwLock.readLock())
         
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
 
-    HARegion haRegion = mock(HARegion.class);
     HashMap map = new HashMap();
     when(haRegion.put(any(), any())).then((invocationOnMock) -> {
       return map.put(invocationOnMock.getArgument(0), 
invocationOnMock.getArgument(1));
@@ -183,4 +204,366 @@ public class HARegionQueueTest {
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedEventsReturnsDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    addEvents();
+    doReturn(false).when(spy).isDispatched(id1);
+    doReturn(true).when(spy).isDispatched(id2);
+    doReturn(true).when(spy).isDispatched(id3);
+    doReturn(false).when(spy).isDispatched(id4);
+
+    List<EventID> dispatchedEvents = spy.getDispatchedEvents(eventIDs);
+
+    assertThat(dispatchedEvents).containsExactlyInAnyOrder(id2, id3);
+  }
+
+  private void addEvents() {
+    eventIDs.add(id1);
+    eventIDs.add(id2);
+    eventIDs.add(id3);
+    eventIDs.add(id4);
+  }
+
+  @Test
+  public void isDispatchedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isDispatched(id1)).isTrue();
+  }
+
+  @Test
+  public void isDispatchedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isDispatched(id1)).isFalse();
+  }
+
+  @Test
+  public void isDispatchedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isDispatched(id1)).isTrue();
+  }
+
+  @Test
+  public void isDispatchedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isDispatched(id1)).isTrue();
+  }
+
+  @Test
+  public void 
doNotScheduleSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+    doReturn(delay).when(spy).getDelay();
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+  }
+
+  @Test
+  public void 
doNotScheduleSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.scheduleSynchronizationWithPrimaryInProgress.set(true);
+    doReturn(delay).when(spy).getDelay();
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+  }
+
+  @Test
+  public void doNotScheduleSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(delay).when(spy).getDelay();
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+  }
+
+  @Test
+  public void 
doNotScheduleSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version()
 {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+    doReturn(delay).when(spy).getDelay();
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+  }
+
+  @Test
+  public void 
scheduleSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doReturn(delay).when(spy).getDelay();
+    doNothing().when(spy).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache, 
delay);
+  }
+
+  @Test
+  public void 
scheduleSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doReturn(delay).when(spy).getDelay();
+    doNothing().when(spy).scheduleSynchronizationWithPrimary(primary, 
internalCache, delay);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache, 
delay);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry<Object, Object> entry1 = 
uncheckedCast(mock(Region.Entry.class));
+    Region.Entry<Object, Object> entry2 = 
uncheckedCast(mock(Region.Entry.class));
+    Region.Entry<Object, Object> entry3 = 
uncheckedCast(mock(Region.Entry.class));
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry2.getValue()).thenReturn(new Object());
+    when(entry3.getValue()).thenReturn(wrapper2);
+
+
+    when(haRegion.getEntry(1L)).thenReturn(entry1);
+    when(haRegion.getEntry(2L)).thenReturn(entry2);
+    when(haRegion.getEntry(3L)).thenReturn(null);
+    when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+    giiEvents = spy.getGIIEvents();
+
+    assertThat(giiEvents).containsExactlyInAnyOrder(id1, id2);
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryReturnsIfNoGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    spy.hasSynchronizedWithPrimary.set(true);
+    doReturn(new LinkedList<EventID>()).when(spy).getGIIEvents();
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+    verify(spy, never()).removeDispatchedEvents(primary, internalCache, 
eventIDs);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryRemoveDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDs).when(spy).getGIIEvents();
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
eventIDs);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveDispatchedEventsFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDs).when(spy).getGIIEvents();
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, 
eventIDs);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryRemoveChunksIfManyGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDs.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDs).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk1);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDs, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+  }
+
+  private void createChunks() {
+    chunk1.add(id1);
+    chunk2.add(id2);
+    chunks.add(chunk1);
+    chunks.add(chunk2);
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveChunksFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDs.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDs).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk1);
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDs, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void getChunksReturnsEqualSizedChunks() {
+    HARegionQueue spy = spy(haRegionQueue);
+    addEvents();
+    // add more events
+    eventIDs.add(mock(EventID.class));
+    eventIDs.add(mock(EventID.class));
+    int maxChunkSize = 2;
+
+    Collection<List<EventID>> myChunks = spy.getChunks(eventIDs, maxChunkSize);
+
+    assertThat(myChunks.size()).isEqualTo(eventIDs.size() / maxChunkSize);
+    for (List<EventID> chunk : myChunks) {
+      assertThat(chunk.size()).isEqualTo(maxChunkSize);
+    }
+  }
+
+  @Test
+  public void removeDispatchedEventAfterSyncWithPrimaryRemovesEvents() throws 
Exception {
+    HARegionQueue spy = spy(haRegionQueue);
+    doNothing().when(spy).removeDispatchedEvents(id1);
+
+    assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+    verify(spy).removeDispatchedEvents(id1);
+  }
+
+  @Test
+  public void removeDispatchedEventReturnsTrueIfRemovalThrowsCacheException() 
throws Exception {
+    HARegionQueue spy = spy(haRegionQueue);
+    doThrow(new 
EntryNotFoundException("")).when(spy).removeDispatchedEvents(id1);
+
+    assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+    verify(spy).removeDispatchedEvents(id1);
+  }
+
+  @Test
+  public void 
removeDispatchedEventReturnsTrueIfRemovalThrowsRegionDestroyedException()
+      throws Exception {
+    HARegionQueue spy = spy(haRegionQueue);
+    doThrow(new RegionDestroyedException("", 
"")).when(spy).removeDispatchedEvents(id1);
+
+    assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+    verify(spy).removeDispatchedEvents(id1);
+  }
+
+  @Test
+  public void 
removeDispatchedEventReturnsFalseIfRemovalThrowsCancelException() throws 
Exception {
+    HARegionQueue spy = spy(haRegionQueue);
+    doThrow(new CacheClosedException()).when(spy).removeDispatchedEvents(id1);
+
+    assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isFalse();
+    verify(spy).removeDispatchedEvents(id1);
+  }
+
+  @Test
+  public void 
removeDispatchedEventsReturnsFalseIfNotGettingEventsFromPrimary() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedEventsFromPrimary(primary, 
internalCache, eventIDs);
+
+    assertThat(spy.removeDispatchedEvents(primary, internalCache, 
eventIDs)).isFalse();
+  }
+
+  @Test
+  public void removeDispatchedEventsReturnsTrueIfRemovedDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> dispatched = new LinkedList<>();
+    dispatched.add(id1);
+    dispatched.add(id3);
+    doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary, 
internalCache, eventIDs);
+    doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
+    doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
+
+    assertThat(spy.removeDispatchedEvents(primary, internalCache, 
eventIDs)).isTrue();
+  }
+
+  @Test
+  public void 
removeDispatchedEventsReturnsFalseIfFailedToRemoveDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> dispatched = new LinkedList<>();
+    dispatched.add(id1);
+    dispatched.add(id3);
+    doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary, 
internalCache, eventIDs);
+    doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
+    doReturn(false).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
+
+    assertThat(spy.removeDispatchedEvents(primary, internalCache, 
eventIDs)).isFalse();
+  }
+
+  @Test
+  public void getDelayReturnsTimeToWait() {
+    HARegionQueue spy = spy(haRegionQueue);
+    long waitTime = 15000L;
+    long currentTime = 5000L;
+    long doneGIIQueueingTime = 0L;
+    spy.doneGIIQueueingTime = doneGIIQueueingTime;
+    long elapsed = currentTime - doneGIIQueueingTime;
+    doReturn(currentTime).when(spy).getCurrentTime();
+
+    assertThat(spy.getDelay()).isEqualTo(waitTime - elapsed);
+  }
+
+  @Test
+  public void getDelayReturnsZeroIfExceedWaitTime() {
+    HARegionQueue spy = spy(haRegionQueue);
+    long waitTime = 15000L;
+    long doneGIIQueueingTime = 0L;
+    long currentTime = waitTime + doneGIIQueueingTime + 1;
+    spy.doneGIIQueueingTime = doneGIIQueueingTime;
+    doReturn(currentTime).when(spy).getCurrentTime();
+
+    assertThat(spy.getDelay()).isEqualTo(0);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
index 07f99b7..3febda3 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
@@ -38,6 +38,7 @@ import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.HARegion;
 import org.apache.geode.internal.cache.InternalCache;
@@ -212,4 +213,24 @@ public class QueueRemovalMessageTest {
 
     assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, 
eventID2)).isTrue();
   }
+
+  @Test
+  public void synchronizeQueueWithPrimaryInvokedAfterProcessEachRegionQueue() {
+    addToMessagesList();
+    Iterator<Object> iterator = messagesList.iterator();
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+    doReturn(sender).when(queueRemovalMessage).getSender();
+
+    queueRemovalMessage.processRegionQueues(cache, iterator);
+
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName1, 
region1EventSize,
+        regionQueue1);
+    verify(regionQueue1).synchronizeQueueWithPrimary(sender, cache);
+    verify(queueRemovalMessage).processRegionQueue(iterator, regionName2, 
region2EventSize,
+        regionQueue2);
+    verify(regionQueue2).synchronizeQueueWithPrimary(sender, cache);
+    verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1, 
eventID1);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, 
eventID2);
+    verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2, 
eventID3);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
new file mode 100644
index 0000000..e27cb42
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class QueueSynchronizationProcessorTest {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ClusterDistributionManager manager =
+      mock(ClusterDistributionManager.class, RETURNS_DEEP_STUBS);
+  private final InternalDistributedMember primary = 
mock(InternalDistributedMember.class);
+  private final InternalCache cache = mock(InternalCache.class);
+  private QueueSynchronizationProcessor.QueueSynchronizationMessage message;
+  private QueueSynchronizationProcessor.QueueSynchronizationReplyMessage 
replyMessage;
+  private final List<EventID> eventIDs = new LinkedList<>();
+  private final EventID id1 = mock(EventID.class);
+  private final EventID id2 = mock(EventID.class);
+  private final int processorId = 11;
+  private final List<EventID> dispatched = new LinkedList<>();
+
+  @Before
+  public void setup() {
+    when((manager.getCache())).thenReturn(cache);
+    eventIDs.add(id1);
+    eventIDs.add(id2);
+  }
+
+  @Test
+  public void processMessageSetsReply() {
+    QueueSynchronizationProcessor processor =
+        spy(new QueueSynchronizationProcessor(manager, primary));
+    replyMessage = 
mock(QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
+
+    processor.process(replyMessage);
+
+    assertThat(processor.reply).isEqualTo(replyMessage);
+  }
+
+  @Test
+  public void processQueueSynchronizationMessageSendsReply() {
+    message = spy(new 
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+    setupMessage();
+
+    message.process(manager);
+
+    verify(replyMessage).setEventIds(dispatched);
+    verify(replyMessage).setSuccess();
+    verifyReplyMessageSent();
+  }
+
+  private void setupMessage() {
+    message.setEventIdList(eventIDs);
+    message.setProcessorId(processorId);
+    replyMessage = 
mock(QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
+    dispatched.add(id2);
+    
doReturn(replyMessage).when(message).createQueueSynchronizationReplyMessage();
+    doReturn(dispatched).when(message).getDispatchedEvents(cache);
+    doReturn(primary).when(message).getSender();
+  }
+
+  private void verifyReplyMessageSent() {
+    verify(replyMessage).setProcessorId(processorId);
+    verify(replyMessage).setRecipient(primary);
+    verify(manager).putOutgoing(replyMessage);
+  }
+
+  @Test
+  public void processQueueSynchronizationMessageCanSendReplyWithException() {
+    expectedException.expect(RuntimeException.class);
+    message = spy(new 
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+    setupMessage();
+    RuntimeException runtimeException = new RuntimeException();
+    doThrow(runtimeException).when(message).getDispatchedEvents(cache);
+
+    message.process(manager);
+
+    verify(replyMessage).setException(any());
+    verifyReplyMessageSent();
+  }
+
+  @Test
+  public void getDispatchedEventsReturnsNullIfQueueIsNull() {
+    message = spy(new 
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+    String regionName = "queueName";
+    message.setRegionName(regionName);
+    when(cache.getRegion(regionName)).thenReturn(null);
+
+    assertThat(message.getDispatchedEvents(cache)).isNull();
+  }
+
+  @Test
+  public void getDispatchedEventsReturns() {
+    message = spy(new 
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+    String regionName = "queueName";
+    message.setRegionName(regionName);
+    message.setEventIdList(eventIDs);
+    HARegion region = mock(HARegion.class);
+    HARegionQueue queue = mock(HARegionQueue.class);
+    when(cache.getRegion(regionName)).thenReturn(uncheckedCast(region));
+    when(region.getOwner()).thenReturn(queue);
+    when(queue.getDispatchedEvents(eventIDs)).thenReturn(dispatched);
+
+    assertThat(message.getDispatchedEvents(cache)).isEqualTo(dispatched);
+  }
+}
diff --git 
a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
 
b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index fe00977..ff1571c 100644
--- 
a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ 
b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -229,8 +229,9 @@ public interface DataSerializableFixedID extends 
SerializationVersions, BasicSer
   byte LATEST_LAST_ACCESS_TIME_MESSAGE = -20;
 
   byte REMOVE_CACHESERVER_PROFILE_UPDATE = -19;
-
-  // IDs -18 through -10 unused
+  byte QUEUE_SYNCHRONIZATION_MESSAGE = -18;
+  byte QUEUE_SYNCHRONIZATION_REPLY_MESSAGE = -17;
+  // IDs -16 through -10 unused
 
   byte PR_REMOVE_ALL_MESSAGE = -9;
   byte REMOVE_ALL_MESSAGE = -8;

Reply via email to