[ 
https://issues.apache.org/jira/browse/GEODE-8465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190400#comment-17190400
 ] 

ASF GitHub Bot commented on GEODE-8465:
---------------------------------------

DonalEvans commented on a change in pull request #5496:
URL: https://github.com/apache/geode/pull/5496#discussion_r483156232



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {

Review comment:
       Is it possible that when we get to this point, 
`synchronizeWithPrimaryInProgress` is true? Do we want to check its value 
before continuing, in case another thread has initiated the synchronization 
already?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);

Review comment:
       The IDE warning on this line can be removed by making this `Map.Entry<?, 
?> entry`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,

Review comment:
       For consistency with other suggested log output changes, this might be 
better as `logger.debug("HARegionQueue {} has synced with primary on {}", 
regionName, primary);`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on 
queue {} for {}",
+            regionName, id);
+      }
+      removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "Queue found destroyed while processing dispatched sequence ID after 
syn."
+              + " The event ID is {} for HARegion with name={}",
+          id, regionName);

Review comment:
       This might be better as `logger.info("HARegionQueue {} was found to be 
destroyed when attempting to remove dispatched event with ID {} after sync", 
regionName, id);`

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);

Review comment:
       The IDE warnings for these lines can be removed by using 
   `Region.Entry<Object,Object> entry1 = 
uncheckedCast(mock(Region.Entry.class));`
   `Region.Entry<Object,Object> entry2 = 
uncheckedCast(mock(Region.Entry.class));`
   `Region.Entry<Object,Object> entry3 = 
uncheckedCast(mock(Region.Entry.class));`

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);
+
+    when(haRegion.getEntry(1L)).thenReturn(entry1);
+    when(haRegion.getEntry(2L)).thenReturn(entry2);
+    when(haRegion.getEntry(3L)).thenReturn(null);
+    when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+    giiEvents = spy.getGIIEvents();
+
+    assertThat(giiEvents.size()).isEqualTo(2);
+    assertThat(giiEvents.contains(id1));
+    assertThat(giiEvents.contains(id2));
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryReturnsIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+    verify(spy, never()).getGIIEvents();
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryReturnsIfNoGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    spy.hasSynchronizedWithPrimary.set(true);
+    doReturn(new LinkedList<EventID>()).when(spy).getGIIEvents();
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy, never()).removeDispatchedEvents(primary, internalCache, 
eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void doSynchronizationWithPrimaryRemoveDispatchedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
eventIDS);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveDispatchedEventsFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    addEvents();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, 
eventIDS);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy, never()).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, eventIDS);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryRemoveChunksIfManyGIIEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDS.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDS, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk1);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  private void createChunks() {
+    chunk1.add(id1);
+    chunk2.add(id2);
+    chunks.add(chunk1);
+    chunks.add(chunk2);
+  }
+
+  @Test
+  public void hasSynchronizedWithPrimaryNotSetIfRemoveChunksFails() {
+    HARegionQueue spy = spy(haRegionQueue);
+    int maxChunkSize = 1000;
+    for (int i = 0; i < 1100; i++) {
+      eventIDS.add(mock(EventID.class));
+    }
+    createChunks();
+    doReturn(eventIDS).when(spy).getGIIEvents();
+    doReturn(chunks).when(spy).getChunks(eventIDS, maxChunkSize);
+    doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk1);
+    doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache, 
chunk2);
+
+    spy.doSynchronizationWithPrimary(primary, internalCache);
+
+    verify(spy).getChunks(eventIDS, maxChunkSize);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+    verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+    assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+    assertThat(spy.synchronizeWithPrimaryInProgress).isFalse();
+  }
+
+  @Test
+  public void getChunksReturnsEqualSizedChunks() {
+    HARegionQueue spy = spy(haRegionQueue);
+    addEvents();
+    // add more events
+    eventIDS.add(mock(EventID.class));
+    eventIDS.add(mock(EventID.class));
+
+    Collection<List<EventID>> myChunks = spy.getChunks(eventIDS, 2);
+
+    assertThat(myChunks.size()).isEqualTo(3);

Review comment:
       Could this 3 be replaced with a calculated value, and the 2 for max 
chunk size extracted to a variable? I think that `eventIDS.size()/maxChunkSize` 
should be the correct value.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());

Review comment:
       The use of the `uncheckedCast()` method here is not necessary, since the 
`instanceof` check ensures that the result of `entry.getValue()` can be safely 
cast to `HAEventWrapper`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on 
queue {} for {}",

Review comment:
       This might be better worded as "After sync with primary for 
HARegionQueue {}, removing dispatched event with ID {}"

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {

Review comment:
       The name of this method (and the `isRemoved()` method below it) are a 
little confusing to me. It appears that we only check if events have been 
dispatched by the primary, and the naming up to this point has been 
consistently only talking about dispatched events. Would it make sense for 
these methods be renamed to `getDispatchedEvents()` and `isDispatched()`, for 
clarity?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -3943,4 +3958,165 @@ private static int calculateThreadIdExpiryTime() {
   public Queue getGiiQueue() {
     return this.giiQueue;
   }
+
+  List<EventID> getDispatchedOrRemovedEvents(List<EventID> eventIds) {
+    List<EventID> removedEvents = new LinkedList<>();
+    for (EventID eventId : eventIds) {
+      if (isRemoved(eventId)) {
+        removedEvents.add(eventId);
+      }
+    }
+    return removedEvents;
+  }
+
+  boolean isRemoved(EventID eventId) {
+    DispatchedAndCurrentEvents wrapper = 
getDispatchedAndCurrentEvents(eventId);
+    if (wrapper != null && eventId.getSequenceID() > 
wrapper.lastDispatchedSequenceId) {
+      return false;
+    }
+    return true;
+  }
+
+  DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+    ThreadIdentifier tid = getThreadIdentifier(eventId);
+    return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+  }
+
+  public void synchronizeQueueWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get() || 
synchronizeWithPrimaryInProgress.get()
+        || !doneGIIQueueing.get()) {
+      return;
+    }
+
+    if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Don't send to primary with version older than 
KnownVersion.GEODE_1_14_0");
+      }
+      return;
+    }
+    runSynchronizationWithPrimary(primary, cache);
+  }
+
+  void runSynchronizationWithPrimary(InternalDistributedMember primary, 
InternalCache cache) {
+    cache.getDistributionManager().getExecutors().getWaitingThreadPool()
+        .execute(() -> doSynchronizationWithPrimary(primary, cache));
+  }
+
+  synchronized void doSynchronizationWithPrimary(InternalDistributedMember 
primary,
+      InternalCache cache) {
+    if (hasSynchronizedWithPrimary.get()) {
+      return;
+    }
+    synchronizeWithPrimaryInProgress.set(true);
+    int maxChunkSize = 1000;
+
+    try {
+      List<EventID> giiEvents = getGIIEvents();
+      if (giiEvents.size() == 0) {
+        hasSynchronizedWithPrimary.set(true);
+        return;
+      }
+      Collection<List<EventID>> chunks = null;
+
+      if (giiEvents.size() > maxChunkSize) {
+        chunks = getChunks(giiEvents, maxChunkSize);
+      }
+
+      if (chunks == null) {
+        if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+          return;
+        }
+      } else {
+        for (List<EventID> chunk : chunks) {
+          if (!removeDispatchedEvents(primary, cache, chunk)) {
+            return;
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("hasSynchronizedWithPrimary has synced with primary {} on 
queue: {} ", primary,
+            regionName);
+      }
+      hasSynchronizedWithPrimary.set(true);
+    } finally {
+      synchronizeWithPrimaryInProgress.set(false);
+    }
+  }
+
+  Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+    AtomicInteger counter = new AtomicInteger(0);
+    return events.stream().collect(Collectors.groupingBy(event -> 
counter.getAndIncrement() / size))
+        .values();
+  }
+
+  boolean removeDispatchedEvents(InternalDistributedMember primary, 
InternalCache cache,
+      List<EventID> chunkEvents) {
+    List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary, 
cache, chunkEvents);
+
+    if (dispatchedEvents == null) {
+      // failed to get events from current primary, need to retry.
+      return false;
+    }
+
+    for (EventID id : dispatchedEvents) {
+      if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+        // failed to remove all dispatched events, need to retry
+        return false;
+      }
+    }
+    return true;
+  }
+
+  List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember 
primary,
+      InternalCache cache, List<EventID> chunkEvents) {
+    return 
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+        primary, regionName, chunkEvents);
+  }
+
+  List<EventID> getGIIEvents() {
+    List<EventID> events = new LinkedList<>();
+    for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+      Map.Entry entry = region.getEntry(i);
+      // could be already removed after processing QueueRemovalMessage
+      if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+        HAEventWrapper wrapper = uncheckedCast(entry.getValue());
+        events.add(wrapper.getEventId());
+      }
+    }
+    return events;
+  }
+
+  boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+    boolean interrupted = Thread.interrupted();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removing dispatched event after sync with primary on 
queue {} for {}",
+            regionName, id);
+      }
+      removeDispatchedEvents(id);
+    } catch (RegionDestroyedException ignore) {
+      logger.info(
+          "Queue found destroyed while processing dispatched sequence ID after 
syn."
+              + " The event ID is {} for HARegion with name={}",
+          id, regionName);
+    } catch (CancelException ignore) {
+      return false;
+    } catch (CacheException e) {
+      logger.error(String.format(
+          "Sync with primary got Exception when removing from the queue. The 
problem is with event ID, %s for HARegion with name=%s",
+          regionName, id),

Review comment:
       It seems like the arguments for the string format are in the wrong order 
here. Alternately, this might be better worded as "HARegionQueue %s encountered 
an exception when attempting to remove event with ID %s from the queue", which 
would have the arguments in the correct order.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
##########
@@ -212,4 +213,24 @@ public void 
removeQueueEventReturnsTrueIfRemovalThrowsRejectedExecutionException
 
     assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2, 
eventID2)).isTrue();
   }
+
+  @Test
+  public void synchronizeQueueWithPrimaryInvokedAfterProcessEachRegionQueue() {
+    addToMessagesList();
+    Iterator iterator = messagesList.iterator();

Review comment:
       The IDE warning for raw use of parameterized class can be removed by 
using `Iterator<Object>` here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));

Review comment:
       These three assertions can be simplified to 
`assertThat(removedEvents).containsExactlyInAnyOrder(id2, id3);`

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {

Review comment:
       If the comments about the naming of `getDispatchedOrRemovedEvents()` and 
`isRemoved()` are applied, this method name should also be changed, along with 
the following 3 test methods.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;

Review comment:
       The declaration and assignment of this variable can be moved to the same 
line.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {

Review comment:
       If the comments about the naming of `getDispatchedOrRemovedEvents()` and 
`isRemoved()` are applied, this method name should also be changed.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);

Review comment:
       The numbering of things is a little confusing here. Is there a reason 
that `entry3` returns `wrapper2`  and `entry2` returns `id3`?

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
##########
@@ -183,4 +203,347 @@ public void 
isQueueInitializedWithWaitReturnsFalseIfNotInitializedAfterWait() th
 
     verify(spy).waitForInitialized(time);
   }
+
+  @Test
+  public void getDispatchedOrRemovedEventsReturnsRemovedEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> removedEvents;
+    addEvents();
+    doReturn(false).when(spy).isRemoved(id1);
+    doReturn(true).when(spy).isRemoved(id2);
+    doReturn(true).when(spy).isRemoved(id3);
+    doReturn(false).when(spy).isRemoved(id4);
+
+    removedEvents = spy.getDispatchedOrRemovedEvents(eventIDS);
+
+    assertThat(removedEvents.size()).isEqualTo(2);
+    assertThat(removedEvents.contains(id2));
+    assertThat(removedEvents.contains(id3));
+  }
+
+  private void addEvents() {
+    eventIDS.add(id1);
+    eventIDS.add(id2);
+    eventIDS.add(id3);
+    eventIDS.add(id4);
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+    HARegionQueue spy = spy(haRegionQueue);
+    doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 99L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isFalse();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(100L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void isRemovedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+    HARegionQueue spy = spy(haRegionQueue);
+    when(id1.getSequenceID()).thenReturn(90L);
+    wrapper.lastDispatchedSequenceId = 100L;
+    doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+    assertThat(spy.isRemoved(id1)).isTrue();
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfHasDoneSynchronization() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.hasSynchronizedWithPrimary.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.synchronizeWithPrimaryInProgress.set(true);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void doNotRunSynchronizationWithPrimaryIfGIINotFinished() {
+    HARegionQueue spy = spy(haRegionQueue);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
doNotRunSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy, never()).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void runSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void 
runSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+    HARegionQueue spy = spy(haRegionQueue);
+    spy.doneGIIQueueing.set(true);
+    when(primary.getVersionOrdinal()).thenReturn((short) 
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+    doNothing().when(spy).runSynchronizationWithPrimary(primary, 
internalCache);
+
+    spy.synchronizeQueueWithPrimary(primary, internalCache);
+    verify(spy).runSynchronizationWithPrimary(primary, internalCache);
+  }
+
+  @Test
+  public void getGIIEventsReturnsCorrectEvents() {
+    HARegionQueue spy = spy(haRegionQueue);
+    List<EventID> giiEvents;
+    spy.positionBeforeGII = 1;
+    spy.positionAfterGII = 4;
+    HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+    HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+    when(wrapper1.getEventId()).thenReturn(id1);
+    when(wrapper2.getEventId()).thenReturn(id2);
+    Region.Entry entry1 = mock(Region.Entry.class);
+    Region.Entry entry2 = mock(Region.Entry.class);
+    Region.Entry entry3 = mock(Region.Entry.class);
+    when(entry1.getValue()).thenReturn(wrapper1);
+    when(entry3.getValue()).thenReturn(wrapper2);
+    when(entry2.getValue()).thenReturn(id3);
+
+    when(haRegion.getEntry(1L)).thenReturn(entry1);
+    when(haRegion.getEntry(2L)).thenReturn(entry2);
+    when(haRegion.getEntry(3L)).thenReturn(null);
+    when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+    giiEvents = spy.getGIIEvents();
+
+    assertThat(giiEvents.size()).isEqualTo(2);
+    assertThat(giiEvents.contains(id1));
+    assertThat(giiEvents.contains(id2));

Review comment:
       These three assertions can be simplified to 
`assertThat(giiEvents).containsExactlyInAnyOrder(id1, id2);`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> A stray event can resurrect on a client due to a race condition
> ---------------------------------------------------------------
>
>                 Key: GEODE-8465
>                 URL: https://issues.apache.org/jira/browse/GEODE-8465
>             Project: Geode
>          Issue Type: Bug
>          Components: client queues
>    Affects Versions: 1.1.0
>            Reporter: Eric Shu
>            Assignee: Eric Shu
>            Priority: Major
>              Labels: caching-applications, pull-request-available
>
> A member with primary queue could send the QRM for the event before it 
> removes the event from its HARegionQueue – this causes a race that a newly 
> started member can gii and get the events from the member with primary queue 
> after it has already sent the QRM for the event to existing members holding 
> the secondary queue.
> The following analysis showing that the race exists. 
> {noformat}
> The stray create of Object_5234on edge 10634 shows this issue with this cause.
> From edge 10634, we know the stray create is for v3 of the object and from 
> bridge 15141.
> [fine 2020/06/17 16:38:49.621 PDT edgegemfire_2_1_host1_10634 <Cache Client 
> Updater Thread on 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_15141:15141)<ec><v35>:41007(version:UNKNOWN[ordinal=125])
>  port 26905> tid=0x541] VersionedThinRegionEntryHeapStringKey2@1b56c386 
> (key=Object_5234; rawValue=VMCachedDeserializable@526996532; version=
> {v3; rv28; 
> mbr=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750)<ec><v27>:41007;
>  ds=2; time=1592434556093}
> ;member=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750)<ec><v27>:41007)
>  dispatching event 
> EntryEventImpl[op=CREATE;region=/DefaultRegion;key=Object_5234;oldValue=null;newValue=VMCachedDeserializable@526996532;callbackArg=Update
>  event originated in pid 
> 10655;originRemote=true;originMember=rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655;version=
> {v3; rv28; 
> mbr=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750)<ec><v27>:41007;
>  ds=2; time=1592434556093; remote}
> ;id=EventIDid=52bytes;threadID=52;sequenceID=108];isFromServer]
> [fine 2020/06/17 16:38:49.621 PDT edgegemfire_2_1_host1_10634 <Cache Client 
> Updater Thread on 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_15141:15141)<ec><v35>:41007(version:UNKNOWN[ordinal=125])
>  port 26905> tid=0x541] Put entry for region: /DefaultRegion key: Object_5234 
> callbackArgument: Update event originated in pid 10655
> ***This version (v3) is an update. The primary queue at the time is 
> bridgegemfire_2_2_host1_10538. The event is enqueued and dispatched ***
> [fine 2020/06/17 15:55:56.098 PDT bridgegemfire_2_2_host1_10538 <P2P message 
> reader for 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125])
>  unshared ordered uid=358 dom #1 port=43914> tid=0x16d] DefaultRegion: 
> notifying 1 bridge servers of event: 
> EntryEventImpl[op=UPDATE;region=/DefaultRegion;key=Object_5234;oldValue=VMCachedDeserializable@1538889434;newValue=VMCachedDeserializable@2141181294;callbackArg=Update
>  event originated in pid 
> 10655;originRemote=true;originMember=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125]);version=
> {v3; rv28; 
> mbr=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125]);
>  ds=2; time=1592434556093; remote}
> ;context=identity(rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655,connection=1;id=EventIDid=52bytes;threadID=52;sequenceID=108];routing=]
> [fine 2020/06/17 15:55:56.098 PDT bridgegemfire_2_2_host1_10538 <P2P message 
> reader for 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125])
>  unshared ordered uid=358 dom #1 port=43914> tid=0x16d] 
> VMStatsRegionEntryHeapLongKey@38ccc0e5 (key=3867; 
> rawValue=HAEventWrapper[refCount=2; 
> msg=ClientUpdateMessageImpl[op=AFTER_UPDATE;region=/DefaultRegion;key=Object_5234;isObject=1;cbArg=Update
>  event originated in pid 
> 10655;memberId=identity(rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655,connection=1;eventId=EventIDid=52bytes;threadID=52;sequenceID=108];shouldConflate=false;versionTag=
> {v3; rv28; 
> mbr=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125]);
>  ds=2; time=1592434556093; remote}
> ;hasCqs=false]]) dispatching event 
> EntryEventImpl[op=CREATE;region=/_gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue;key=3867;oldValue=null;newValue=HAEventWrapper[refCount=2;
>  
> msg=ClientUpdateMessageImpl[op=AFTER_UPDATE;region=/DefaultRegion;key=Object_5234;isObject=1;cbArg=Update
>  event originated in pid 
> 10655;memberId=identity(rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655,connection=1;eventId=EventIDid=52bytes;threadID=52;sequenceID=108];shouldConflate=false;versionTag=
> {v3; rv28; 
> mbr=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125]);
>  ds=2; time=1592434556093; remote}
> ;hasCqs=false]];callbackArg=null;originRemote=false;originMember=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_2_host1_10538:10538)<ec><v2>:41006]
> ***QRM is sent to the 2 peers at the time with the region ***
> [fine 2020/06/17 15:55:56.213 PDT bridgegemfire_2_2_host1_10538 <Queue 
> Removal Thread> tid=0x83] Sending 
> (QueueRemovalMessage[_gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue,
>  1, EventIDid=52bytes;threadID=52;sequenceID=108], 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_3_host1_10666:10666:loner):60462:26db73c4:edgegemfire_2_3_host1_10666_1_queue,
>  1, EventIDid=52bytes;threadID=52;sequenceID=108]]) to 2 peers 
> ([rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_1_host1_18750:18750)<ec><v27>:41007(version:UNKNOWN[ordinal=125])@277,
>  
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_4_host1_23574:23574)<ec><v31>:41011(version:UNKNOWN[ordinal=125])@566])
>  via tcp/ip
> ***the newly started bridgegemfire_2_3_24320 creates the HARegionQueue and 
> gii from the member with primary queue (10538) ***
> [info 2020/06/17 15:56:08.800 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] Initializing region 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
> [info 2020/06/17 15:56:09.285 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
>  is done getting image from 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_2_host1_10538:10538)<ec><v2>:41006(version:GEODE
>  1.3.0). isDeltaGII is false
> [debug 2020/06/17 15:56:09.377 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] adding haContainerKey to HARegion at 
> 1:HAEventWrapper[region=/DefaultRegion; key=Object_5234; refCount=1; 
> putInProgress=0; 
> event=EventID[rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655;threadID=52;sequenceID=108];
>  no message] for 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
> [debug 2020/06/17 15:56:09.377 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] invoking listeners: 
> [org.apache.geode.internal.cache.ha.HARegionQueue$1@2e69472b]
> [debug 2020/06/17 15:56:09.377 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] Adding position 1 to available IDs. Region: 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
> [debug 2020/06/17 15:56:09.377 PDT <Client Queue Initialization Thread 1> 
> tid=0x76] RegionQueue on 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue(backup)
>  done putting GII data into queue
> ***Please note that the event was being removed from its primary queue after 
> QRM has been sent and bridge 24320 gii the events from it ***
> [fine 2020/06/17 15:56:09.666 PDT bridgegemfire_2_2_host1_10538 <Client 
> Message Dispatcher for 
> rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634>
>  tid=0x84] VMStatsRegionEntryHeapLongKey@38ccc0e5 (key=3867; 
> rawValue=REMOVED_PHASE2) dispatching event 
> EntryEventImpl[op=LOCAL_DESTROY;region=/_gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue;key=3867;oldValue=HAEventWrapper[region=/DefaultRegion;
>  key=Object_5234; refCount=1; inContainer=true; putInProgress=false; 
> event=EventIDid=52bytes;threadID=52;sequenceID=108]; no 
> message];newValue=null;callbackArg=null;originRemote=false;originMember=rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_2_host1_10538:10538)<ec><v2>:41006]
> ***Bridge 24320 now has the event for Object_5234(v3) that has been sent to 
> the client and won't get QRM again ***
> ***Later, bridge 15141 gii from 24320 and got the event ***
> [info 2020/06/17 16:17:36.454 PDT <Client Queue Initialization Thread 2> 
> tid=0x78] Initializing region 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
> [info 2020/06/17 16:17:40.187 PDT <Client Queue Initialization Thread 2> 
> tid=0x78] 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
>  is done getting image from 
> rs-GEM-2944-1535-hydra-client-1(bridgegemfire_2_3_host1_24320:24320)<ec><v33>:41010.
>  isDeltaGII is false
> [debug 2020/06/17 16:17:40.415 PDT <Client Queue Initialization Thread 2> 
> tid=0x78] 
> _gfe_non_durable_client_with_id_rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634_1_queue
>  processing queue key 1 and value HAEventWrapper[region=/DefaultRegion; 
> key=Object_5234; refCount=1; putInProgress=0; 
> event=EventID[rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_2_host1_10655:loner):60216:b0d873c4:edgegemfire_2_2_host1_10655;threadID=52;sequenceID=108];
>  no message]
> ***It will send the event to client after it becomes the primary queue holder 
> for client 10634 ***
> [debug 2020/06/17 16:38:49.125 PDT <ServerConnection on port 26905 Thread 5> 
> tid=0xb4] 
> CacheClientProxy[identity(rs-GEM-2944-1535-hydra-client-1(edgegemfire_2_1_host1_10634:10634:loner):51062:4bd673c4:edgegemfire_2_1_host1_10634,connection=1;
>  port=53006; primary=true; version=GEODE 1.3.0]: Queueing marker message. 
> <org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl@75bf95c8>.
>  The queue contains 57 entries.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to