This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8a7b1fc GEODE-8465: secondary HARegionQueue to sync with primary
queue (#5496)
8a7b1fc is described below
commit 8a7b1fc7bc91d772d51282ea527c7e58711569c1
Author: Eric Shu <[email protected]>
AuthorDate: Wed Sep 9 11:39:44 2020 -0700
GEODE-8465: secondary HARegionQueue to sync with primary queue (#5496)
A seconary HARegionQueue will try to sync with primary after GII
to remove any events have been already dispatched from primary.
---
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +
.../org/apache/geode/internal/DSFIDFactory.java | 5 +
.../geode/internal/cache/ha/HARegionQueue.java | 219 +++++++++++-
.../internal/cache/ha/QueueRemovalMessage.java | 4 +
.../cache/ha/QueueSynchronizationProcessor.java | 252 +++++++++++++
.../geode/internal/cache/ha/HARegionQueueTest.java | 391 ++++++++++++++++++++-
.../internal/cache/ha/QueueRemovalMessageTest.java | 21 ++
.../ha/QueueSynchronizationProcessorTest.java | 142 ++++++++
.../serialization/DataSerializableFixedID.java | 5 +-
9 files changed, 1029 insertions(+), 18 deletions(-)
diff --git
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 733737e..cbc995d 100644
---
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1308,6 +1308,14 @@ org/apache/geode/internal/cache/ha/QueueRemovalMessage,2
fromData,119
toData,125
+org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor$QueueSynchronizationMessage,2
+fromData,77
+toData,86
+
+org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor$QueueSynchronizationReplyMessage,2
+fromData,76
+toData,80
+
org/apache/geode/internal/cache/ha/ThreadIdentifier,2
fromData,19
toData,19
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index bd613db..504e7d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -274,6 +274,7 @@ import
org.apache.geode.internal.cache.control.SerializableRegionRedundancyStatu
import
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
import
org.apache.geode.internal.cache.ha.HARegionQueue.DispatchedAndCurrentEvents;
import org.apache.geode.internal.cache.ha.QueueRemovalMessage;
+import org.apache.geode.internal.cache.ha.QueueSynchronizationProcessor;
import org.apache.geode.internal.cache.locks.TXLockBatch;
import org.apache.geode.internal.cache.locks.TXLockIdImpl;
import org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage;
@@ -581,6 +582,10 @@ public class DSFIDFactory implements
DataSerializableFixedID {
serializer.registerDSFID(ADD_CACHESERVER_PROFILE_UPDATE,
AddCacheServerProfileMessage.class);
serializer.registerDSFID(REMOVE_CACHESERVER_PROFILE_UPDATE,
RemoveCacheServerProfileMessage.class);
+ serializer.registerDSFID(QUEUE_SYNCHRONIZATION_MESSAGE,
+ QueueSynchronizationProcessor.QueueSynchronizationMessage.class);
+ serializer.registerDSFID(QUEUE_SYNCHRONIZATION_REPLY_MESSAGE,
+ QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
serializer.registerDSFID(SERVER_INTEREST_REGISTRATION_MESSAGE,
ServerInterestRegistrationMessage.class);
serializer.registerDSFID(FILTER_PROFILE_UPDATE,
FilterProfile.OperationMessage.class);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index c2f4a42..e2c7dc5 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -24,6 +24,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -42,8 +43,11 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
@@ -80,6 +84,7 @@ import
org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.Conflatable;
@@ -204,7 +209,7 @@ public class HARegionQueue implements RegionQueue {
* via GII , then there should not be any violation. If there is data
arriving thru GII, such
* violations can be expected , but should be analyzed thoroughly.
*/
- protected boolean puttingGIIDataInQueue;
+ protected final AtomicBoolean puttingGIIDataInQueue = new AtomicBoolean();
/**
* flag indicating whether interest has been registered for this queue. This
is used to prevent
@@ -321,6 +326,13 @@ public class HARegionQueue implements RegionQueue {
*/
protected long maxQueueSizeHitCount = 0;
+ final AtomicBoolean hasSynchronizedWithPrimary = new AtomicBoolean();
+ final AtomicBoolean scheduleSynchronizationWithPrimaryInProgress = new
AtomicBoolean();
+ final AtomicBoolean doneGIIQueueing = new AtomicBoolean();
+ volatile long doneGIIQueueingTime;
+ volatile long positionBeforeGII = 0;
+ volatile long positionAfterGII = 0;
+
/**
* Processes the given string and returns a string which is allowed for
region names
*
@@ -488,7 +500,7 @@ public class HARegionQueue implements RegionQueue {
// data, then the relevant data structures have to
// be populated
if (!entrySet.isEmpty()) {
- this.puttingGIIDataInQueue = true;
+ puttingGIIDataInQueue.set(true);
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
Region.Entry entry = null;
@@ -533,7 +545,13 @@ public class HARegionQueue implements RegionQueue {
}
this.tailKey.set(max);
} finally {
- this.puttingGIIDataInQueue = false;
+ puttingGIIDataInQueue.set(false);
+ positionAfterGII = tailKey.get();
+ if (positionAfterGII > 0) {
+ positionBeforeGII = 1;
+ }
+ doneGIIQueueing.set(true);
+ doneGIIQueueingTime = System.currentTimeMillis();
if (isDebugEnabled) {
logger.debug("{} done putting GII data into queue", this);
}
@@ -671,14 +689,14 @@ public class HARegionQueue implements RegionQueue {
long sequenceID = eventId.getSequenceID();
// Check from Events Map if the put operation should proceed or not
DispatchedAndCurrentEvents dace = (DispatchedAndCurrentEvents)
this.eventsMap.get(ti);
- if (dace != null && dace.isGIIDace && this.puttingGIIDataInQueue) {
+ if (dace != null && dace.isGIIDace && puttingGIIDataInQueue.get()) {
// we only need to retain DACE for which there are no entries in the
queue.
// for other thread identifiers we build up a new DACE
dace = null;
}
if (dace != null) {
// check the last dispatched sequence Id
- if (this.puttingGIIDataInQueue || (sequenceID >
dace.lastDispatchedSequenceId)) {
+ if (puttingGIIDataInQueue.get() || (sequenceID >
dace.lastDispatchedSequenceId)) {
// Asif:Insert the Event into the Region with proper locking.
// It is possible that by the time put operation proceeds , the
// Last dispatched id has changed so it is possible that the object at
@@ -686,7 +704,7 @@ public class HARegionQueue implements RegionQueue {
// also does not get added
if (!dace.putObject(event, sequenceID)) {
// dace encountered a DESTROYED token - stop adding GII data
- if (!this.puttingGIIDataInQueue) {
+ if (!puttingGIIDataInQueue.get()) {
this.put(object);
}
} else {
@@ -1689,8 +1707,7 @@ public class HARegionQueue implements RegionQueue {
/**
- * Used for testing purposes only. Returns the set of current counters for
the given
- * ThreadIdentifier
+ * Returns the set of current counters for the given ThreadIdentifier
*
* @param id - the EventID object
* @return - the current counters set
@@ -2962,7 +2979,7 @@ public class HARegionQueue implements RegionQueue {
Long oldPosition = null;
final boolean isDebugEnabled_BS =
logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE);
if (isDebugEnabled_BS && this.lastSequenceIDPut >= sequenceID
- && !owningQueue.puttingGIIDataInQueue) {
+ && !owningQueue.puttingGIIDataInQueue.get()) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
"HARegionQueue::DACE:putObject: Given sequence ID is already
present ({}).\nThis may be a recovered operation via P2P or a
GetInitialImage.\nlastSequenceIDPut = {} ; event = {};\n",
sequenceID, lastSequenceIDPut, event);
@@ -2977,7 +2994,7 @@ public class HARegionQueue implements RegionQueue {
logger.trace("HARegionQueue.putObject: adding {}", event);
}
this.lastSequenceIDPut = sequenceID;
- } else if (!owningQueue.puttingGIIDataInQueue) {
+ } else if (!owningQueue.puttingGIIDataInQueue.get()) {
if (isDebugEnabled_BS) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
"{} eliding event with ID {}, because it is not greater than
the last sequence ID ({}). The rejected event has key <{}> and value <{}>",
@@ -2999,7 +3016,7 @@ public class HARegionQueue implements RegionQueue {
return false;
}
- if (sequenceID > lastDispatchedSequenceId ||
owningQueue.puttingGIIDataInQueue) {
+ if (sequenceID > lastDispatchedSequenceId ||
owningQueue.puttingGIIDataInQueue.get()) {
// Insert the object into the Region
Long position = owningQueue.tailKey.incrementAndGet();
@@ -3720,7 +3737,7 @@ public class HARegionQueue implements RegionQueue {
if (logger.isDebugEnabled()) {
logger.debug(caller + " decremented Event ID hash code: " +
haContainerKey.hashCode()
+ "; System ID hash code: " +
System.identityHashCode(haContainerKey)
- + "; Wrapper details: " + haContainerKey);
+ + "; Wrapper details: " + haContainerKey + "; for queue: " +
regionName);
}
if (haContainerKey.decAndGetReferenceCount() == 0L) {
HARegionQueue.this.haContainer.remove(haContainerKey);
@@ -3943,4 +3960,182 @@ public class HARegionQueue implements RegionQueue {
public Queue getGiiQueue() {
return this.giiQueue;
}
+
+ List<EventID> getDispatchedEvents(List<EventID> eventIds) {
+ List<EventID> dispatchedEvents = new LinkedList<>();
+ for (EventID eventId : eventIds) {
+ if (isDispatched(eventId)) {
+ dispatchedEvents.add(eventId);
+ }
+ }
+ return dispatchedEvents;
+ }
+
+ boolean isDispatched(EventID eventId) {
+ DispatchedAndCurrentEvents wrapper =
getDispatchedAndCurrentEvents(eventId);
+ if (wrapper != null && eventId.getSequenceID() >
wrapper.lastDispatchedSequenceId) {
+ return false;
+ }
+ return true;
+ }
+
+ DispatchedAndCurrentEvents getDispatchedAndCurrentEvents(EventID eventId) {
+ ThreadIdentifier tid = getThreadIdentifier(eventId);
+ return (DispatchedAndCurrentEvents) eventsMap.get(tid);
+ }
+
+ public synchronized void
synchronizeQueueWithPrimary(InternalDistributedMember primary,
+ InternalCache cache) {
+ if (scheduleSynchronizationWithPrimaryInProgress.get() ||
hasSynchronizedWithPrimary.get()
+ || !doneGIIQueueing.get()) {
+ // Order of the check is important here as timer scheduled thread
+ // setting hasSynchronizedWithPrimary to true first before setting
+ // scheduleSynchronizationWithPrimaryInProgress to false in
doSynchronizationWithPrimary.
+ return;
+ }
+ if (primary.getVersionOrdinal() < KnownVersion.GEODE_1_14_0.ordinal()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Don't send to primary with version older than
KnownVersion.GEODE_1_14_0");
+ }
+ return;
+ }
+ if (!hasSynchronizedWithPrimary.get() &&
!scheduleSynchronizationWithPrimaryInProgress.get()) {
+ scheduleSynchronizationWithPrimaryInProgress.set(true);
+ }
+ long delay = getDelay();
+ scheduleSynchronizationWithPrimary(primary, cache, delay);
+ }
+
+ long getDelay() {
+ // Wait for in-flight events during gii to be distributed to the member
with primary queue.
+ long waitTime = 15 * 1000;
+ long currentTime = getCurrentTime();
+ long elapsed = currentTime - doneGIIQueueingTime;
+ return elapsed < waitTime ? waitTime - elapsed : 0L;
+ }
+
+ long getCurrentTime() {
+ return System.currentTimeMillis();
+ }
+
+ synchronized void
scheduleSynchronizationWithPrimary(InternalDistributedMember primary,
+ InternalCache cache, long delay) {
+ cache.getCCPTimer().schedule(new SystemTimer.SystemTimerTask() {
+ @Override
+ public void run2() {
+ doSynchronizationWithPrimary(primary, cache);
+ }
+ }, delay);
+ }
+
+ synchronized void doSynchronizationWithPrimary(InternalDistributedMember
primary,
+ InternalCache cache) {
+ int maxChunkSize = 1000;
+ try {
+ List<EventID> giiEvents = getGIIEvents();
+ if (giiEvents.size() == 0) {
+ hasSynchronizedWithPrimary.set(true);
+ return;
+ }
+ Collection<List<EventID>> chunks = null;
+
+ if (giiEvents.size() > maxChunkSize) {
+ chunks = getChunks(giiEvents, maxChunkSize);
+ }
+
+ if (chunks == null) {
+ if (!removeDispatchedEvents(primary, cache, giiEvents)) {
+ return;
+ }
+ } else {
+ for (List<EventID> chunk : chunks) {
+ if (!removeDispatchedEvents(primary, cache, chunk)) {
+ return;
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("HARegionQueue {} has synced with primary on {}",
regionName, primary);
+ }
+ hasSynchronizedWithPrimary.set(true);
+ } finally {
+ scheduleSynchronizationWithPrimaryInProgress.set(false);
+ }
+ }
+
+ Collection<List<EventID>> getChunks(List<EventID> events, int size) {
+ AtomicInteger counter = new AtomicInteger(0);
+ return events.stream().collect(Collectors.groupingBy(event ->
counter.getAndIncrement() / size))
+ .values();
+ }
+
+ boolean removeDispatchedEvents(InternalDistributedMember primary,
InternalCache cache,
+ List<EventID> chunkEvents) {
+ List<EventID> dispatchedEvents = getDispatchedEventsFromPrimary(primary,
cache, chunkEvents);
+
+ if (dispatchedEvents == null) {
+ // failed to get events from current primary, need to retry.
+ return false;
+ }
+
+ for (EventID id : dispatchedEvents) {
+ if (!removeDispatchedEventAfterSyncWithPrimary(id)) {
+ // failed to remove all dispatched events, need to retry
+ return false;
+ }
+ }
+ return true;
+ }
+
+ List<EventID> getDispatchedEventsFromPrimary(InternalDistributedMember
primary,
+ InternalCache cache, List<EventID> chunkEvents) {
+ return
QueueSynchronizationProcessor.getDispatchedEvents(cache.getDistributionManager(),
+ primary, regionName, chunkEvents);
+ }
+
+ List<EventID> getGIIEvents() {
+ List<EventID> events = new LinkedList<>();
+ for (long i = positionBeforeGII; i < positionAfterGII + 1; i++) {
+ Map.Entry<?, ?> entry = region.getEntry(i);
+ // could be already removed after processing QueueRemovalMessage
+ if (entry != null && entry.getValue() instanceof HAEventWrapper) {
+ HAEventWrapper wrapper = (HAEventWrapper) entry.getValue();
+ events.add(wrapper.getEventId());
+ }
+ }
+ return events;
+ }
+
+ boolean removeDispatchedEventAfterSyncWithPrimary(EventID id) {
+ boolean interrupted = Thread.interrupted();
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "After sync with primary for HARegionQueue {}, removing dispatched
event with ID {}",
+ regionName, id);
+ }
+ removeDispatchedEvents(id);
+ } catch (RegionDestroyedException ignore) {
+ logger.info(
+ "HARegionQueue {} was found to be destroyed when attempting to
remove dispatched event with ID {} after sync",
+ regionName, id);
+ } catch (CancelException ignore) {
+ return false;
+ } catch (CacheException e) {
+ logger.error(String.format(
+ "HARegionQueue %s encountered an exception when attempting to remove
event with ID %s from the queue",
+ regionName, id), e);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (RejectedExecutionException ignore) {
+ interrupted = true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return true;
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
index f789411..4c967be 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueRemovalMessage.java
@@ -98,6 +98,10 @@ public class QueueRemovalMessage extends
PooledDistributionMessage {
boolean succeed = processRegionQueue(iterator, regionName, size, hrq);
if (!succeed) {
return;
+ } else {
+ if (hrq != null) {
+ hrq.synchronizeQueueWithPrimary(getSender(), cache);
+ }
}
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
new file mode 100644
index 0000000..52cb741
--- /dev/null
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+/**
+ * Synchronize queue with primary holder
+ */
+public class QueueSynchronizationProcessor extends ReplyProcessor21 {
+ private static final Logger logger = LogService.getLogger();
+
+ QueueSynchronizationReplyMessage reply;
+
+ QueueSynchronizationProcessor(DistributionManager dm,
InternalDistributedMember primary) {
+ super(dm, primary);
+ }
+
+ static List<EventID> getDispatchedEvents(final DistributionManager dm,
+ final InternalDistributedMember primary, String regionName,
List<EventID> events) {
+ QueueSynchronizationProcessor processor = new
QueueSynchronizationProcessor(dm, primary);
+
+ QueueSynchronizationMessage message = new QueueSynchronizationMessage();
+ message.setEventIdList(events);
+ message.setProcessorId(processor.getProcessorId());
+ message.setRegionName(regionName);
+
+ message.setRecipient(primary);
+ dm.putOutgoing(message);
+
+ try {
+ processor.waitForRepliesUninterruptibly();
+ } catch (ReplyException e) {
+ e.handleCause();
+ }
+
+ if (processor.reply != null) {
+ return processor.reply.getDispatchedEvents();
+ }
+ // Failed to get reply.
+ return null;
+ }
+
+ @Override
+ public void process(DistributionMessage msg) {
+ try {
+ reply = (QueueSynchronizationReplyMessage) msg;
+ } finally {
+ super.process(msg);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // QueueSynchronizationMessage
+ // This message is sent to the the primary queue holder after receiving
QueueRemovalMessage.
+ // It contains the list of eventIDs that has been GIIed but not yet removed.
+ // The primary queue holder will check and sent back a list of event ids
that has been dispatched.
+ // -------------------------------------------------------------------------
+ public static class QueueSynchronizationMessage extends
PooledDistributionMessage implements
+ MessageWithReply {
+ private List<EventID> eventIds;
+
+ private int processorId;
+ private String regionName;
+
+ public QueueSynchronizationMessage() {}
+
+ void setEventIdList(List<EventID> message) {
+ eventIds = message;
+ }
+
+ void setRegionName(String regionName) {
+ this.regionName = regionName;
+ }
+
+ void setProcessorId(int processorId) {
+ this.processorId = processorId;
+ }
+
+ @Override
+ protected void process(ClusterDistributionManager dm) {
+ final QueueSynchronizationReplyMessage replyMessage =
+ createQueueSynchronizationReplyMessage();
+ ReplyException replyException = null;
+
+ final InternalCache cache = dm.getCache();
+ try {
+ if (cache != null) {
+ List<EventID> dispatched = getDispatchedEvents(cache);
+ replyMessage.setEventIds(dispatched);
+ replyMessage.setSuccess();
+ }
+ } catch (RuntimeException | Error e) {
+ replyException = new ReplyException(e);
+ throw e;
+ } finally {
+ replyMessage.setProcessorId(processorId);
+ replyMessage.setRecipient(getSender());
+ if (replyException != null) {
+ replyMessage.setException(replyException);
+ }
+ dm.putOutgoing(replyMessage);
+ }
+ }
+
+ QueueSynchronizationReplyMessage createQueueSynchronizationReplyMessage() {
+ return new QueueSynchronizationReplyMessage();
+ }
+
+ List<EventID> getDispatchedEvents(InternalCache cache) {
+ LocalRegion region = (LocalRegion) cache.getRegion(regionName);
+ if (region == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("processing QueueSynchronizationMessage region {} does
not exist.",
+ regionName);
+ }
+ return null;
+ }
+ HARegionQueue haRegionQueue = ((HARegion) region).getOwner();
+ return haRegionQueue.getDispatchedEvents(eventIds);
+ }
+
+ @Override
+ public int getDSFID() {
+ return QUEUE_SYNCHRONIZATION_MESSAGE;
+ }
+
+ @Override
+ public int getProcessorId() {
+ return processorId;
+ }
+
+ @Override
+ public void toData(DataOutput out,
+ SerializationContext context) throws IOException {
+ super.toData(out, context);
+ DataSerializer.writeString(regionName, out);
+ DataSerializer.writeInteger(processorId, out);
+ int numberOfIds = eventIds.size();
+ DataSerializer.writeInteger(numberOfIds, out);
+ for (EventID eventId : eventIds) {
+ DataSerializer.writeObject(eventId, out);
+ }
+ }
+
+ @Override
+ public void fromData(DataInput in,
+ DeserializationContext context)
+ throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ eventIds = new LinkedList<>();
+
+ regionName = DataSerializer.readString(in);
+ processorId = DataSerializer.readInteger(in);
+ int size = DataSerializer.readInteger(in);
+ for (int i = 0; i < size; i++) {
+ eventIds.add(uncheckedCast(DataSerializer.readObject(in)));
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // QueueSynchronizationReplyMessage
+ // -------------------------------------------------------------------------
+ public static class QueueSynchronizationReplyMessage extends ReplyMessage {
+ private List<EventID> events;
+ private boolean succeed = false;
+
+ public QueueSynchronizationReplyMessage() {}
+
+ void setEventIds(List<EventID> events) {
+ this.events = events;
+ }
+
+ List<EventID> getDispatchedEvents() {
+ return events;
+ }
+
+ void setSuccess() {
+ succeed = true;
+ }
+
+ @Override
+ public int getDSFID() {
+ return QUEUE_SYNCHRONIZATION_REPLY_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out, SerializationContext context) throws
IOException {
+ super.toData(out, context);
+ DataSerializer.writeBoolean(succeed, out);
+ if (succeed) {
+ DataSerializer.writeInteger(events.size(), out);
+ for (EventID eventId : events) {
+ DataSerializer.writeObject(eventId, out);
+ }
+ }
+ }
+
+ @Override
+ public void fromData(DataInput in, DeserializationContext context)
+ throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+ succeed = DataSerializer.readBoolean(in);
+ if (succeed) {
+ events = new LinkedList<>();
+ int size = DataSerializer.readInteger(in);
+ for (int i = 0; i < size; i++) {
+ events.add(uncheckedCast(DataSerializer.readObject(in)));
+ }
+ }
+ }
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
index f490675..b671fd7 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueTest.java
@@ -15,10 +15,12 @@
package org.apache.geode.internal.cache.ha;
import static junit.framework.TestCase.assertEquals;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -27,7 +29,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Before;
@@ -35,25 +40,42 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.statistics.StatisticsClock;
import
org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@Category({ClientSubscriptionTest.class})
public class HARegionQueueTest {
-
private HARegionQueue haRegionQueue;
+ private final HARegion haRegion = mock(HARegion.class);
+ private final InternalCache internalCache = mock(InternalCache.class);
+ private final List<EventID> eventIDs = new LinkedList<>();
+ private final EventID id1 = mock(EventID.class);
+ private final EventID id2 = mock(EventID.class);
+ private final EventID id3 = mock(EventID.class);
+ private final EventID id4 = mock(EventID.class);
+ private final Collection<List<EventID>> chunks = new LinkedList<>();
+ private final List<EventID> chunk1 = new LinkedList<>();
+ private final List<EventID> chunk2 = new LinkedList<>();
+ private final InternalDistributedMember primary =
mock(InternalDistributedMember.class);
+ private final HARegionQueue.DispatchedAndCurrentEvents wrapper =
+ new HARegionQueue.DispatchedAndCurrentEvents();
+ private final long delay = 1L;
@Before
public void setup() throws IOException, ClassNotFoundException,
InterruptedException {
- InternalCache internalCache = mock(InternalCache.class);
-
StoppableReentrantReadWriteLock giiLock =
mock(StoppableReentrantReadWriteLock.class);
when(giiLock.readLock())
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
@@ -64,7 +86,6 @@ public class HARegionQueueTest {
when(rwLock.readLock())
.thenReturn(mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
- HARegion haRegion = mock(HARegion.class);
HashMap map = new HashMap();
when(haRegion.put(any(), any())).then((invocationOnMock) -> {
return map.put(invocationOnMock.getArgument(0),
invocationOnMock.getArgument(1));
@@ -183,4 +204,366 @@ public class HARegionQueueTest {
verify(spy).waitForInitialized(time);
}
+
+ @Test
+ public void getDispatchedEventsReturnsDispatchedEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ addEvents();
+ doReturn(false).when(spy).isDispatched(id1);
+ doReturn(true).when(spy).isDispatched(id2);
+ doReturn(true).when(spy).isDispatched(id3);
+ doReturn(false).when(spy).isDispatched(id4);
+
+ List<EventID> dispatchedEvents = spy.getDispatchedEvents(eventIDs);
+
+ assertThat(dispatchedEvents).containsExactlyInAnyOrder(id2, id3);
+ }
+
+ private void addEvents() {
+ eventIDs.add(id1);
+ eventIDs.add(id2);
+ eventIDs.add(id3);
+ eventIDs.add(id4);
+ }
+
+ @Test
+ public void isDispatchedReturnsTrueIfDispatchedAndCurrentEventsAreRemoved() {
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(null).when(spy).getDispatchedAndCurrentEvents(id1);
+
+ assertThat(spy.isDispatched(id1)).isTrue();
+ }
+
+ @Test
+ public void isDispatchedReturnsFalseIfSequenceIdGreaterThanLastDispatched() {
+ HARegionQueue spy = spy(haRegionQueue);
+ when(id1.getSequenceID()).thenReturn(100L);
+ wrapper.lastDispatchedSequenceId = 99L;
+ doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+ assertThat(spy.isDispatched(id1)).isFalse();
+ }
+
+ @Test
+ public void isDispatchedReturnsTrueIfSequenceIdEqualsLastDispatched() {
+ HARegionQueue spy = spy(haRegionQueue);
+ when(id1.getSequenceID()).thenReturn(100L);
+ wrapper.lastDispatchedSequenceId = 100L;
+ doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+ assertThat(spy.isDispatched(id1)).isTrue();
+ }
+
+ @Test
+ public void isDispatchedReturnsTrueIfSequenceIdLessThanLastDispatched() {
+ HARegionQueue spy = spy(haRegionQueue);
+ when(id1.getSequenceID()).thenReturn(90L);
+ wrapper.lastDispatchedSequenceId = 100L;
+ doReturn(wrapper).when(spy).getDispatchedAndCurrentEvents(id1);
+
+ assertThat(spy.isDispatched(id1)).isTrue();
+ }
+
+ @Test
+ public void
doNotScheduleSynchronizationWithPrimaryIfHasDoneSynchronization() {
+ HARegionQueue spy = spy(haRegionQueue);
+ spy.hasSynchronizedWithPrimary.set(true);
+ doReturn(delay).when(spy).getDelay();
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy, never()).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+ }
+
+ @Test
+ public void
doNotScheduleSynchronizationWithPrimaryIfSynchronizationIsInProgress() {
+ HARegionQueue spy = spy(haRegionQueue);
+ spy.scheduleSynchronizationWithPrimaryInProgress.set(true);
+ doReturn(delay).when(spy).getDelay();
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy, never()).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+ }
+
+ @Test
+ public void doNotScheduleSynchronizationWithPrimaryIfGIINotFinished() {
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(delay).when(spy).getDelay();
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy, never()).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+ }
+
+ @Test
+ public void
doNotScheduleSynchronizationWithPrimaryIfPrimaryHasOlderThanGEODE_1_14_0Version()
{
+ HARegionQueue spy = spy(haRegionQueue);
+ spy.doneGIIQueueing.set(true);
+ when(primary.getVersionOrdinal()).thenReturn((short)
(KnownVersion.GEODE_1_14_0.ordinal() - 1));
+ doReturn(delay).when(spy).getDelay();
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy, never()).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+ }
+
+ @Test
+ public void
scheduleSynchronizationWithPrimaryIfPrimaryIsGEODE_1_14_0Version() {
+ HARegionQueue spy = spy(haRegionQueue);
+ spy.doneGIIQueueing.set(true);
+
when(primary.getVersionOrdinal()).thenReturn(KnownVersion.GEODE_1_14_0.ordinal());
+ doReturn(delay).when(spy).getDelay();
+ doNothing().when(spy).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache,
delay);
+ }
+
+ @Test
+ public void
scheduleSynchronizationWithPrimaryIfPrimaryIsLaterThanGEODE_1_14_0Version() {
+ HARegionQueue spy = spy(haRegionQueue);
+ spy.doneGIIQueueing.set(true);
+ when(primary.getVersionOrdinal()).thenReturn((short)
(KnownVersion.GEODE_1_14_0.ordinal() + 1));
+ doReturn(delay).when(spy).getDelay();
+ doNothing().when(spy).scheduleSynchronizationWithPrimary(primary,
internalCache, delay);
+
+ spy.synchronizeQueueWithPrimary(primary, internalCache);
+ verify(spy).scheduleSynchronizationWithPrimary(primary, internalCache,
delay);
+ }
+
+ @Test
+ public void getGIIEventsReturnsCorrectEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ List<EventID> giiEvents;
+ spy.positionBeforeGII = 1;
+ spy.positionAfterGII = 4;
+ HAEventWrapper wrapper1 = mock(HAEventWrapper.class);
+ HAEventWrapper wrapper2 = mock(HAEventWrapper.class);
+ when(wrapper1.getEventId()).thenReturn(id1);
+ when(wrapper2.getEventId()).thenReturn(id2);
+ Region.Entry<Object, Object> entry1 =
uncheckedCast(mock(Region.Entry.class));
+ Region.Entry<Object, Object> entry2 =
uncheckedCast(mock(Region.Entry.class));
+ Region.Entry<Object, Object> entry3 =
uncheckedCast(mock(Region.Entry.class));
+ when(entry1.getValue()).thenReturn(wrapper1);
+ when(entry2.getValue()).thenReturn(new Object());
+ when(entry3.getValue()).thenReturn(wrapper2);
+
+
+ when(haRegion.getEntry(1L)).thenReturn(entry1);
+ when(haRegion.getEntry(2L)).thenReturn(entry2);
+ when(haRegion.getEntry(3L)).thenReturn(null);
+ when(haRegion.getEntry(4L)).thenReturn(entry3);
+
+ giiEvents = spy.getGIIEvents();
+
+ assertThat(giiEvents).containsExactlyInAnyOrder(id1, id2);
+ }
+
+ @Test
+ public void doSynchronizationWithPrimaryReturnsIfNoGIIEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ int maxChunkSize = 1000;
+ spy.hasSynchronizedWithPrimary.set(true);
+ doReturn(new LinkedList<EventID>()).when(spy).getGIIEvents();
+
+ spy.doSynchronizationWithPrimary(primary, internalCache);
+
+ verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+ verify(spy, never()).removeDispatchedEvents(primary, internalCache,
eventIDs);
+ assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+ assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+ }
+
+ @Test
+ public void doSynchronizationWithPrimaryRemoveDispatchedEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ int maxChunkSize = 1000;
+ addEvents();
+ doReturn(eventIDs).when(spy).getGIIEvents();
+ doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache,
eventIDs);
+
+ spy.doSynchronizationWithPrimary(primary, internalCache);
+
+ verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+ verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
+ assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+ assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+ }
+
+ @Test
+ public void hasSynchronizedWithPrimaryNotSetIfRemoveDispatchedEventsFails() {
+ HARegionQueue spy = spy(haRegionQueue);
+ int maxChunkSize = 1000;
+ addEvents();
+ doReturn(eventIDs).when(spy).getGIIEvents();
+ doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache,
eventIDs);
+
+ spy.doSynchronizationWithPrimary(primary, internalCache);
+
+ verify(spy, never()).getChunks(eventIDs, maxChunkSize);
+ verify(spy).removeDispatchedEvents(primary, internalCache, eventIDs);
+ assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+ assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+ }
+
+ @Test
+ public void hasSynchronizedWithPrimaryRemoveChunksIfManyGIIEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ int maxChunkSize = 1000;
+ for (int i = 0; i < 1100; i++) {
+ eventIDs.add(mock(EventID.class));
+ }
+ createChunks();
+ doReturn(eventIDs).when(spy).getGIIEvents();
+ doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
+ doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache,
chunk1);
+ doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache,
chunk2);
+
+ spy.doSynchronizationWithPrimary(primary, internalCache);
+
+ verify(spy).getChunks(eventIDs, maxChunkSize);
+ verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+ verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+ assertThat(spy.hasSynchronizedWithPrimary).isTrue();
+ assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+ }
+
+ private void createChunks() {
+ chunk1.add(id1);
+ chunk2.add(id2);
+ chunks.add(chunk1);
+ chunks.add(chunk2);
+ }
+
+ @Test
+ public void hasSynchronizedWithPrimaryNotSetIfRemoveChunksFails() {
+ HARegionQueue spy = spy(haRegionQueue);
+ int maxChunkSize = 1000;
+ for (int i = 0; i < 1100; i++) {
+ eventIDs.add(mock(EventID.class));
+ }
+ createChunks();
+ doReturn(eventIDs).when(spy).getGIIEvents();
+ doReturn(chunks).when(spy).getChunks(eventIDs, maxChunkSize);
+ doReturn(true).when(spy).removeDispatchedEvents(primary, internalCache,
chunk1);
+ doReturn(false).when(spy).removeDispatchedEvents(primary, internalCache,
chunk2);
+
+ spy.doSynchronizationWithPrimary(primary, internalCache);
+
+ verify(spy).getChunks(eventIDs, maxChunkSize);
+ verify(spy).removeDispatchedEvents(primary, internalCache, chunk1);
+ verify(spy).removeDispatchedEvents(primary, internalCache, chunk2);
+ assertThat(spy.hasSynchronizedWithPrimary).isFalse();
+ assertThat(spy.scheduleSynchronizationWithPrimaryInProgress).isFalse();
+ }
+
+ @Test
+ public void getChunksReturnsEqualSizedChunks() {
+ HARegionQueue spy = spy(haRegionQueue);
+ addEvents();
+ // add more events
+ eventIDs.add(mock(EventID.class));
+ eventIDs.add(mock(EventID.class));
+ int maxChunkSize = 2;
+
+ Collection<List<EventID>> myChunks = spy.getChunks(eventIDs, maxChunkSize);
+
+ assertThat(myChunks.size()).isEqualTo(eventIDs.size() / maxChunkSize);
+ for (List<EventID> chunk : myChunks) {
+ assertThat(chunk.size()).isEqualTo(maxChunkSize);
+ }
+ }
+
+ @Test
+ public void removeDispatchedEventAfterSyncWithPrimaryRemovesEvents() throws
Exception {
+ HARegionQueue spy = spy(haRegionQueue);
+ doNothing().when(spy).removeDispatchedEvents(id1);
+
+ assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+ verify(spy).removeDispatchedEvents(id1);
+ }
+
+ @Test
+ public void removeDispatchedEventReturnsTrueIfRemovalThrowsCacheException()
throws Exception {
+ HARegionQueue spy = spy(haRegionQueue);
+ doThrow(new
EntryNotFoundException("")).when(spy).removeDispatchedEvents(id1);
+
+ assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+ verify(spy).removeDispatchedEvents(id1);
+ }
+
+ @Test
+ public void
removeDispatchedEventReturnsTrueIfRemovalThrowsRegionDestroyedException()
+ throws Exception {
+ HARegionQueue spy = spy(haRegionQueue);
+ doThrow(new RegionDestroyedException("",
"")).when(spy).removeDispatchedEvents(id1);
+
+ assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isTrue();
+ verify(spy).removeDispatchedEvents(id1);
+ }
+
+ @Test
+ public void
removeDispatchedEventReturnsFalseIfRemovalThrowsCancelException() throws
Exception {
+ HARegionQueue spy = spy(haRegionQueue);
+ doThrow(new CacheClosedException()).when(spy).removeDispatchedEvents(id1);
+
+ assertThat(spy.removeDispatchedEventAfterSyncWithPrimary(id1)).isFalse();
+ verify(spy).removeDispatchedEvents(id1);
+ }
+
+ @Test
+ public void
removeDispatchedEventsReturnsFalseIfNotGettingEventsFromPrimary() {
+ HARegionQueue spy = spy(haRegionQueue);
+ doReturn(null).when(spy).getDispatchedEventsFromPrimary(primary,
internalCache, eventIDs);
+
+ assertThat(spy.removeDispatchedEvents(primary, internalCache,
eventIDs)).isFalse();
+ }
+
+ @Test
+ public void removeDispatchedEventsReturnsTrueIfRemovedDispatchedEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ List<EventID> dispatched = new LinkedList<>();
+ dispatched.add(id1);
+ dispatched.add(id3);
+ doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary,
internalCache, eventIDs);
+ doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
+ doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
+
+ assertThat(spy.removeDispatchedEvents(primary, internalCache,
eventIDs)).isTrue();
+ }
+
+ @Test
+ public void
removeDispatchedEventsReturnsFalseIfFailedToRemoveDispatchedEvents() {
+ HARegionQueue spy = spy(haRegionQueue);
+ List<EventID> dispatched = new LinkedList<>();
+ dispatched.add(id1);
+ dispatched.add(id3);
+ doReturn(dispatched).when(spy).getDispatchedEventsFromPrimary(primary,
internalCache, eventIDs);
+ doReturn(true).when(spy).removeDispatchedEventAfterSyncWithPrimary(id1);
+ doReturn(false).when(spy).removeDispatchedEventAfterSyncWithPrimary(id3);
+
+ assertThat(spy.removeDispatchedEvents(primary, internalCache,
eventIDs)).isFalse();
+ }
+
+ @Test
+ public void getDelayReturnsTimeToWait() {
+ HARegionQueue spy = spy(haRegionQueue);
+ long waitTime = 15000L;
+ long currentTime = 5000L;
+ long doneGIIQueueingTime = 0L;
+ spy.doneGIIQueueingTime = doneGIIQueueingTime;
+ long elapsed = currentTime - doneGIIQueueingTime;
+ doReturn(currentTime).when(spy).getCurrentTime();
+
+ assertThat(spy.getDelay()).isEqualTo(waitTime - elapsed);
+ }
+
+ @Test
+ public void getDelayReturnsZeroIfExceedWaitTime() {
+ HARegionQueue spy = spy(haRegionQueue);
+ long waitTime = 15000L;
+ long doneGIIQueueingTime = 0L;
+ long currentTime = waitTime + doneGIIQueueingTime + 1;
+ spy.doneGIIQueueingTime = doneGIIQueueingTime;
+ doReturn(currentTime).when(spy).getCurrentTime();
+
+ assertThat(spy.getDelay()).isEqualTo(0);
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
index 07f99b7..3febda3 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueRemovalMessageTest.java
@@ -38,6 +38,7 @@ import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
@@ -212,4 +213,24 @@ public class QueueRemovalMessageTest {
assertThat(queueRemovalMessage.removeQueueEvent(regionName2, regionQueue2,
eventID2)).isTrue();
}
+
+ @Test
+ public void synchronizeQueueWithPrimaryInvokedAfterProcessEachRegionQueue() {
+ addToMessagesList();
+ Iterator<Object> iterator = messagesList.iterator();
+ InternalDistributedMember sender = mock(InternalDistributedMember.class);
+ doReturn(sender).when(queueRemovalMessage).getSender();
+
+ queueRemovalMessage.processRegionQueues(cache, iterator);
+
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName1,
region1EventSize,
+ regionQueue1);
+ verify(regionQueue1).synchronizeQueueWithPrimary(sender, cache);
+ verify(queueRemovalMessage).processRegionQueue(iterator, regionName2,
region2EventSize,
+ regionQueue2);
+ verify(regionQueue2).synchronizeQueueWithPrimary(sender, cache);
+ verify(queueRemovalMessage).removeQueueEvent(regionName1, regionQueue1,
eventID1);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2,
eventID2);
+ verify(queueRemovalMessage).removeQueueEvent(regionName2, regionQueue2,
eventID3);
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
new file mode 100644
index 0000000..e27cb42
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/QueueSynchronizationProcessorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class QueueSynchronizationProcessorTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private final ClusterDistributionManager manager =
+ mock(ClusterDistributionManager.class, RETURNS_DEEP_STUBS);
+ private final InternalDistributedMember primary =
mock(InternalDistributedMember.class);
+ private final InternalCache cache = mock(InternalCache.class);
+ private QueueSynchronizationProcessor.QueueSynchronizationMessage message;
+ private QueueSynchronizationProcessor.QueueSynchronizationReplyMessage
replyMessage;
+ private final List<EventID> eventIDs = new LinkedList<>();
+ private final EventID id1 = mock(EventID.class);
+ private final EventID id2 = mock(EventID.class);
+ private final int processorId = 11;
+ private final List<EventID> dispatched = new LinkedList<>();
+
+ @Before
+ public void setup() {
+ when((manager.getCache())).thenReturn(cache);
+ eventIDs.add(id1);
+ eventIDs.add(id2);
+ }
+
+ @Test
+ public void processMessageSetsReply() {
+ QueueSynchronizationProcessor processor =
+ spy(new QueueSynchronizationProcessor(manager, primary));
+ replyMessage =
mock(QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
+
+ processor.process(replyMessage);
+
+ assertThat(processor.reply).isEqualTo(replyMessage);
+ }
+
+ @Test
+ public void processQueueSynchronizationMessageSendsReply() {
+ message = spy(new
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+ setupMessage();
+
+ message.process(manager);
+
+ verify(replyMessage).setEventIds(dispatched);
+ verify(replyMessage).setSuccess();
+ verifyReplyMessageSent();
+ }
+
+ private void setupMessage() {
+ message.setEventIdList(eventIDs);
+ message.setProcessorId(processorId);
+ replyMessage =
mock(QueueSynchronizationProcessor.QueueSynchronizationReplyMessage.class);
+ dispatched.add(id2);
+
doReturn(replyMessage).when(message).createQueueSynchronizationReplyMessage();
+ doReturn(dispatched).when(message).getDispatchedEvents(cache);
+ doReturn(primary).when(message).getSender();
+ }
+
+ private void verifyReplyMessageSent() {
+ verify(replyMessage).setProcessorId(processorId);
+ verify(replyMessage).setRecipient(primary);
+ verify(manager).putOutgoing(replyMessage);
+ }
+
+ @Test
+ public void processQueueSynchronizationMessageCanSendReplyWithException() {
+ expectedException.expect(RuntimeException.class);
+ message = spy(new
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+ setupMessage();
+ RuntimeException runtimeException = new RuntimeException();
+ doThrow(runtimeException).when(message).getDispatchedEvents(cache);
+
+ message.process(manager);
+
+ verify(replyMessage).setException(any());
+ verifyReplyMessageSent();
+ }
+
+ @Test
+ public void getDispatchedEventsReturnsNullIfQueueIsNull() {
+ message = spy(new
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+ String regionName = "queueName";
+ message.setRegionName(regionName);
+ when(cache.getRegion(regionName)).thenReturn(null);
+
+ assertThat(message.getDispatchedEvents(cache)).isNull();
+ }
+
+ @Test
+ public void getDispatchedEventsReturns() {
+ message = spy(new
QueueSynchronizationProcessor.QueueSynchronizationMessage());
+ String regionName = "queueName";
+ message.setRegionName(regionName);
+ message.setEventIdList(eventIDs);
+ HARegion region = mock(HARegion.class);
+ HARegionQueue queue = mock(HARegionQueue.class);
+ when(cache.getRegion(regionName)).thenReturn(uncheckedCast(region));
+ when(region.getOwner()).thenReturn(queue);
+ when(queue.getDispatchedEvents(eventIDs)).thenReturn(dispatched);
+
+ assertThat(message.getDispatchedEvents(cache)).isEqualTo(dispatched);
+ }
+}
diff --git
a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index fe00977..ff1571c 100644
---
a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++
b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -229,8 +229,9 @@ public interface DataSerializableFixedID extends
SerializationVersions, BasicSer
byte LATEST_LAST_ACCESS_TIME_MESSAGE = -20;
byte REMOVE_CACHESERVER_PROFILE_UPDATE = -19;
-
- // IDs -18 through -10 unused
+ byte QUEUE_SYNCHRONIZATION_MESSAGE = -18;
+ byte QUEUE_SYNCHRONIZATION_REPLY_MESSAGE = -17;
+ // IDs -16 through -10 unused
byte PR_REMOVE_ALL_MESSAGE = -9;
byte REMOVE_ALL_MESSAGE = -8;