This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-6802
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6802 by this
push:
new da960f3 GEODE-6802: Execute region synchronization on newly joined
member.
da960f3 is described below
commit da960f389d5c54a23f6bfa33363b637b550bafb4
Author: eshu <[email protected]>
AuthorDate: Thu May 23 16:52:30 2019 -0700
GEODE-6802: Execute region synchronization on newly joined member.
* Region sync will be invoked to avoid data inconsistency on a newly
joined/restarted member
if it received a region synchronization request from other members due
to timed task.
* Use a flag in RegionVersionHolder to ensure only one such call is
executed.
* Make sure RVV exception is filled for persistent member requesting
region sync. (This may
leads to CommitConflictException on some transactions on persistent
regions due to this
region sync operation. But it should be rare and is acceptable compared
to data inconsistency
issue.)
---
.../distributed/internal/DistributionAdvisor.java | 27 +++--
.../geode/internal/cache/DistributedRegion.java | 19 +++
.../internal/cache/InitialImageOperation.java | 33 ++++-
.../internal/cache/persistence/DiskStoreID.java | 5 +
.../cache/versions/RegionVersionHolder.java | 21 +++-
.../cache/versions/RegionVersionVector.java | 43 ++++---
.../internal/cache/versions/VersionSource.java | 4 +
.../internal/cache/DistributedRegionTest.java | 53 ++++++++
.../internal/cache/InitialImageOperationTest.java | 32 ++++-
.../cache/versions/RegionVersionHolderTest.java | 41 +++++++
.../cache/versions/RegionVersionVectorTest.java | 135 +++++++++++++++++++++
11 files changed, 377 insertions(+), 36 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 669f59a..78d33a4 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -265,6 +265,14 @@ public class DistributionAdvisor {
if (isDebugEnabled) {
logger.debug("da.syncForCrashedMember will sync region in cache's timer
for region: {}", dr);
}
+ CacheProfile cacheProfile = (CacheProfile) profile;
+ PersistentMemberID persistentId = getPersistentID(cacheProfile);
+ VersionSource lostVersionID;
+ if (persistentId != null) {
+ lostVersionID = persistentId.getVersionMember();
+ } else {
+ lostVersionID = id;
+ }
// schedule the synchronization for execution in the future based on the
client health monitor
// interval. This allows client caches to retry an operation that might
otherwise be recovered
// through the sync operation. Without associated event information this
could cause the
@@ -289,11 +297,9 @@ public class DistributionAdvisor {
}
}
}
- CacheProfile cp = (CacheProfile) profile;
- PersistentMemberID persistentId = cp.persistentID;
if (dr.getDataPolicy().withPersistence() && persistentId == null) {
// Fix for 46704. The lost member may be a replicate
- // or an empty accessor. We don't need to to a synchronization
+ // or an empty accessor. We don't need to do a synchronization
// in that case, because those members send their writes to
// a persistent member.
if (isDebugEnabled) {
@@ -303,17 +309,16 @@ public class DistributionAdvisor {
}
return;
}
-
-
- VersionSource lostVersionID;
- if (persistentId != null) {
- lostVersionID = persistentId.getVersionMember();
- } else {
- lostVersionID = id;
- }
dr.synchronizeForLostMember(id, lostVersionID);
}
}, delay);
+ if (dr.getConcurrencyChecksEnabled()) {
+ dr.setRegionSynchronizeScheduled(lostVersionID);
+ }
+ }
+
+ private PersistentMemberID getPersistentID(CacheProfile cp) {
+ return cp.persistentID;
}
/** find the region for a delta-gii operation (synch) */
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index eb6f0e5..7602758 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -117,6 +117,7 @@ import
org.apache.geode.internal.cache.tx.RemoteFetchVersionMessage.FetchVersion
import org.apache.geode.internal.cache.tx.RemoteInvalidateMessage;
import org.apache.geode.internal.cache.tx.RemotePutMessage;
import
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
@@ -1298,6 +1299,24 @@ public class DistributedRegion extends LocalRegion
implements InternalDistribute
op.synchronizeWith(target, versionMember, lostMember);
}
+
+ public void setRegionSynchronizeScheduled(VersionSource lostMemberVersionID)
{
+ RegionVersionHolder regionVersionHolder =
+ getVersionVector().getHolderForMember(lostMemberVersionID);
+ if (regionVersionHolder != null) {
+ regionVersionHolder.setRegionSynchronizeScheduled();
+ }
+ }
+
+ public boolean setRegionSynchronizedWithIfNotScheduled(VersionSource
lostMemberVersionID) {
+ RegionVersionHolder regionVersionHolder =
+ getVersionVector().getHolderForMember(lostMemberVersionID);
+ if (regionVersionHolder != null) {
+ return regionVersionHolder.setRegionSynchronizeScheduledOrDoneIfNot();
+ }
+ return false;
+ }
+
/** remove any partial entries received in a failed GII */
void cleanUpAfterFailedGII(boolean recoverFromDisk) {
DiskRegion dskRgn = getDiskRegion();
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 81624f1..449148e 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -808,7 +808,6 @@ public class InitialImageOperation {
boolean processChunk(List entries, InternalDistributedMember sender, Version
remoteVersion)
throws IOException, ClassNotFoundException {
final boolean isDebugEnabled = logger.isDebugEnabled();
- final boolean isTraceEnabled = logger.isTraceEnabled();
// one volatile read of test flag
int slow = slowImageProcessing;
@@ -896,8 +895,8 @@ public class InitialImageOperation {
if (diskRegion != null) {
// verify if entry from GII is the same as the one from recovery
RegionEntry regionEntry = this.entries.getEntry(entry.key);
- if (isTraceEnabled) {
- logger.trace("processChunk:entry={},tag={},re={}", entry, tag,
regionEntry);
+ if (isDebugEnabled) {
+ logger.debug("processChunk:entry={},tag={},re={}", entry, tag,
regionEntry);
}
// re will be null if the gii chunk gives us a create
if (regionEntry != null) {
@@ -974,8 +973,8 @@ public class InitialImageOperation {
if (tag != null) {
tag.replaceNullIDs(sender);
}
- if (isTraceEnabled) {
- logger.trace(
+ if (isDebugEnabled) {
+ logger.debug(
"processChunk:initialImagePut:key={},lastModified={},tmpValue={},wasRecovered={},tag={}",
entry.key, lastModified, tmpValue, wasRecovered, tag);
}
@@ -1619,12 +1618,15 @@ public class InitialImageOperation {
final boolean lclAbortTest = abortTest;
if (lclAbortTest)
abortTest = false;
-
+ DistributedRegion targetRegion = null;
boolean sendFailureMessage = true;
try {
Assert.assertTrue(this.regionPath != null, "Region path is null.");
final DistributedRegion rgn =
(DistributedRegion) getGIIRegion(dm, this.regionPath,
this.targetReinitialized);
+ if (lostMemberID != null) {
+ targetRegion = rgn;
+ }
if (rgn == null) {
return;
}
@@ -1878,6 +1880,16 @@ public class InitialImageOperation {
sendFailureMessage(dm, rex);
} // !success
+ if (lostMemberID != null && targetRegion != null) {
+ if (lostMemberVersionID == null) {
+ lostMemberVersionID = lostMemberID;
+ }
+ // check to see if the region in this cache needs to synchronize
with others
+ // it is possible that the cache is recover/restart of a member and
not
+ // scheduled to synchronize with others
+ synchronizeIfNotScheduled(targetRegion, lostMemberID,
lostMemberVersionID);
+ }
+
if (internalAfterSentImageReply != null
&&
regionPath.endsWith(internalAfterSentImageReply.getRegionName())) {
internalAfterSentImageReply.run();
@@ -1891,6 +1903,15 @@ public class InitialImageOperation {
null, null);
}
+ void synchronizeIfNotScheduled(DistributedRegion region,
+ InternalDistributedMember lostMember, VersionSource lostVersionSource)
{
+ if (region.setRegionSynchronizedWithIfNotScheduled(lostVersionSource)) {
+ // if region synchronization has not been scheduled or performed,
+ // we do synchronization with others right away as we received the
synchronization request
+ // indicating timed task has been triggered on other nodes
+ region.synchronizeForLostMember(lostMember, lostVersionSource);
+ }
+ }
/**
* Serialize the entries into byte[] chunks, calling proc for each one.
proc args: the byte[]
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
index 601d248..0aaafea 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskStoreID.java
@@ -146,4 +146,9 @@ public class DiskStoreID implements
VersionSource<DiskStoreID>, Serializable {
return Long.toHexString(mostSig).substring(8);
}
+ @Override
+ public boolean isDiskStoreId() {
+ return true;
+ }
+
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
index f378e32..de3cfee 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionHolder.java
@@ -63,6 +63,8 @@ public class RegionVersionHolder<T> implements Cloneable,
DataSerializable {
private List<RVVException> exceptions;
boolean isDepartedMember;
+ private transient boolean regionSynchronizeScheduledOrDone;
+
// non final for tests
@MutableForTesting
public static int BIT_SET_WIDTH = 64 * 16; // should be a multiple of 4
64-bit longs
@@ -140,8 +142,6 @@ public class RegionVersionHolder<T> implements Cloneable,
DataSerializable {
return getExceptions().toString();
}
-
- /* test only method */
public void setVersion(long ver) {
this.version = ver;
}
@@ -383,7 +383,7 @@ public class RegionVersionHolder<T> implements Cloneable,
DataSerializable {
/**
* Add an exception that is older than this.bitSetVersion.
*/
- protected synchronized void addException(long previousVersion, long
nextVersion) {
+ synchronized void addException(long previousVersion, long nextVersion) {
if (this.exceptions == null) {
this.exceptions = new LinkedList<RVVException>();
}
@@ -791,4 +791,19 @@ public class RegionVersionHolder<T> implements Cloneable,
DataSerializable {
return canon;
}
+ private synchronized boolean isRegionSynchronizeScheduledOrDone() {
+ return regionSynchronizeScheduledOrDone;
+ }
+
+ public synchronized void setRegionSynchronizeScheduled() {
+ regionSynchronizeScheduledOrDone = true;
+ }
+
+ public synchronized boolean setRegionSynchronizeScheduledOrDoneIfNot() {
+ if (!isRegionSynchronizeScheduledOrDone()) {
+ regionSynchronizeScheduledOrDone = true;
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
index 66a2a6b..b645f55 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/RegionVersionVector.java
@@ -242,7 +242,14 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
liveHolders = new HashMap<T, RegionVersionHolder<T>>(this.memberToVersion);
RegionVersionHolder<T> holder = liveHolders.get(mbr);
if (holder == null) {
- holder = new RegionVersionHolder<T>(-1);
+ if (mbr.isDiskStoreId() && mbr.equals(myId)) {
+ // For region recovered from disk, we may have local exceptions needs
to be
+ // brought back during region synchronization
+ holder = localExceptions.clone();
+ holder.setVersion(localVersion.get());
+ } else {
+ holder = new RegionVersionHolder<T>(-1);
+ }
} else {
holder = holder.clone();
}
@@ -804,26 +811,34 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
* @return true if this vector has seen the given version
*/
public boolean contains(T id, long version) {
- if (id.equals(this.myId)) {
- if (isForSynchronization()) {
- // a sync vector only has one holder & no valid version for the
vector's owner
+ RegionVersionHolder<T> holder = this.memberToVersion.get(id);
+ // For region synchronization.
+ if (isForSynchronization()) {
+ if (holder == null) {
+ // we only care about missing changes from a particular member, and
this
+ // vector is known to contain that member's version holder
return true;
}
+ if (id.equals(this.myId)) {
+ if (!myId.isDiskStoreId()) {
+ // a sync vector only has one holder if not recovered from
persistence,
+ // no valid version for the vector's owner
+ return true;
+ }
+ }
+ return holder.contains(version);
+ }
+
+ // Regular GII
+ if (id.equals(this.myId)) {
if (getCurrentVersion() < version) {
return false;
} else {
return !localExceptions.hasExceptionFor(version);
}
}
- RegionVersionHolder<T> holder = this.memberToVersion.get(id);
if (holder == null) {
- if (this.singleMember) {
- // we only care about missing changes from a particular member, and
this
- // vector is known to contain that member's version holder
- return true;
- } else {
- return false;
- }
+ return false;
} else {
return holder.contains(version);
}
@@ -1151,7 +1166,7 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
if (cId != null) {
return cId;
}
- if (id instanceof InternalDistributedMember) {
+ if (!id.isDiskStoreId()) {
InternalDistributedSystem system =
InternalDistributedSystem.getConnectedInstance();
if (system != null) {
can = (T)
system.getDistributionManager().getCanonicalId((InternalDistributedMember) id);
@@ -1456,7 +1471,7 @@ public abstract class RegionVersionVector<T extends
VersionSource<?>>
}
public static RegionVersionVector<?> create(VersionSource<?> versionMember,
LocalRegion owner) {
- if (versionMember instanceof DiskStoreID) {
+ if (versionMember.isDiskStoreId()) {
return new DiskRegionVersionVector((DiskStoreID) versionMember, owner);
} else {
return new VMRegionVersionVector((InternalDistributedMember)
versionMember, owner);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
index e52dfe4..7d349d9 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/versions/VersionSource.java
@@ -31,4 +31,8 @@ import org.apache.geode.internal.DataSerializableFixedID;
public interface VersionSource<T> extends DataSerializableFixedID,
Comparable<T> {
void writeEssentialData(DataOutput out) throws IOException;
+
+ default boolean isDiskStoreId() {
+ return false;
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 59454c3..143072f 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -24,11 +24,25 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.junit.Before;
import org.junit.Test;
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionSource;
public class DistributedRegionTest {
+ private RegionVersionVector vector;
+ private RegionVersionHolder holder;
+ private VersionSource lostMemberVersionID;
+
+ @Before
+ public void setup() {
+ vector = mock(RegionVersionVector.class);
+ holder = mock(RegionVersionHolder.class);
+ lostMemberVersionID = mock(VersionSource.class);
+ }
@Test
public void shouldBeMockable() throws Exception {
@@ -93,4 +107,43 @@ public class DistributedRegionTest {
assertThat(distributedRegion.lockWhenRegionIsInitializing()).isFalse();
verify(distributedRegion, never()).lockFailedInitialImageReadLock();
}
+
+ @Test
+ public void
versionHolderInvokesSetRegionSynchronizeScheduledIfVectorContainsLostMemberID()
{
+ DistributedRegion distributedRegion = mock(DistributedRegion.class);
+ when(distributedRegion.getVersionVector()).thenReturn(vector);
+ when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+
doCallRealMethod().when(distributedRegion).setRegionSynchronizeScheduled(lostMemberVersionID);
+
+ distributedRegion.setRegionSynchronizeScheduled(lostMemberVersionID);
+
+ verify(holder).setRegionSynchronizeScheduled();
+ }
+
+ @Test
+ public void
versionHolderInvokesSetRegionSynchronizeScheduledOrDoneIfNotIfVectorContainsLostMemberID()
{
+ DistributedRegion distributedRegion = mock(DistributedRegion.class);
+ when(distributedRegion.getVersionVector()).thenReturn(vector);
+ when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+ doCallRealMethod().when(distributedRegion)
+ .setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID);
+ when(holder.setRegionSynchronizeScheduledOrDoneIfNot()).thenReturn(true);
+
+
assertThat(distributedRegion.setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID))
+ .isTrue();
+
+ verify(holder).setRegionSynchronizeScheduledOrDoneIfNot();
+ }
+
+ @Test
+ public void
setRegionSynchronizedWithIfNotScheduledReturnsFalseIfVectorDoesNotContainLostMemberID()
{
+ DistributedRegion distributedRegion = mock(DistributedRegion.class);
+ when(distributedRegion.getVersionVector()).thenReturn(vector);
+ when(vector.getHolderForMember(lostMemberVersionID)).thenReturn(holder);
+
+
assertThat(distributedRegion.setRegionSynchronizedWithIfNotScheduled(lostMemberVersionID))
+ .isFalse();
+
+ verify(holder, never()).setRegionSynchronizeScheduledOrDoneIfNot();
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
index 64a09bb..26663a5 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -27,6 +28,8 @@ import org.junit.Test;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.versions.VersionSource;
public class InitialImageOperationTest {
@@ -34,6 +37,10 @@ public class InitialImageOperationTest {
private String path;
private LocalRegion region;
private InternalCache cache;
+ private InitialImageOperation.RequestImageMessage message;
+ private DistributedRegion distributedRegion;
+ private InternalDistributedMember lostMember;
+ private VersionSource versionSource;
@Before
public void setUp() {
@@ -42,6 +49,10 @@ public class InitialImageOperationTest {
cache = mock(InternalCache.class);
dm = mock(ClusterDistributionManager.class);
region = mock(LocalRegion.class);
+ message = spy(new InitialImageOperation.RequestImageMessage());
+ distributedRegion = mock(DistributedRegion.class);
+ lostMember = mock(InternalDistributedMember.class);
+ versionSource = mock(VersionSource.class);
when(dm.getExistingCache()).thenReturn(cache);
when(cache.getRegion(path)).thenReturn(region);
@@ -57,8 +68,6 @@ public class InitialImageOperationTest {
@Test
public void
processRequestImageMessageWillSendFailureMessageIfGotCancelException() {
- InitialImageOperation.RequestImageMessage message =
- spy(new InitialImageOperation.RequestImageMessage());
message.regionPath = "regionPath";
when(dm.getExistingCache()).thenThrow(new CacheClosedException());
@@ -66,4 +75,23 @@ public class InitialImageOperationTest {
verify(message).sendFailureMessage(eq(dm), eq(null));
}
+
+ @Test
+ public void
synchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization()
{
+
when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource)).thenReturn(true);
+
+ message.synchronizeIfNotScheduled(distributedRegion, lostMember,
versionSource);
+
+ verify(distributedRegion).synchronizeForLostMember(lostMember,
versionSource);
+ }
+
+ @Test
+ public void
synchronizeForLostMemberIsNotInvokedIfRegionHasScheduledOrDoneSynchronization()
{
+
when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource))
+ .thenReturn(false);
+
+ message.synchronizeIfNotScheduled(distributedRegion, lostMember,
versionSource);
+
+ verify(distributedRegion, never()).synchronizeForLostMember(lostMember,
versionSource);
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java
new file mode 100644
index 0000000..b8d3d53
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionHolderTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.versions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.persistence.DiskStoreID;
+
+public class RegionVersionHolderTest {
+ @Test
+ public void setRegionSynchronizeScheduledCanSetSyncScheduledOrDone() {
+ DiskStoreID server = new DiskStoreID(0, 0);
+ RegionVersionHolder holder = new RegionVersionHolder(server);
+
+ holder.setRegionSynchronizeScheduled();
+ assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isFalse();
+ }
+
+ @Test
+ public void
setRegionSynchronizeScheduledOrDoneIfNotReturnsTrueIfSyncScheduledNotSet() {
+ DiskStoreID server = new DiskStoreID(0, 0);
+ RegionVersionHolder holder = new RegionVersionHolder(server);
+
+ assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isTrue();
+ assertThat(holder.setRegionSynchronizeScheduledOrDoneIfNot()).isFalse();
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index 1d052a3..ce328ca 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -100,7 +100,142 @@ public class RegionVersionVectorTest {
assertFalse(singletonRVV.contains(server2, 6));
assertFalse(singletonRVV.contains(server2, 7));
assertFalse(singletonRVV.contains(server2, 9));
+ }
+
+ @Test
+ public void
testSynchronizationVectorContainsAllVersionsForSameOwnerAsTargetAndNonTarget() {
+ final String local = getIPLiteral();
+ InternalDistributedMember server1 = new InternalDistributedMember(local,
101);
+ InternalDistributedMember server2 = new InternalDistributedMember(local,
102);
+ InternalDistributedMember server3 = new InternalDistributedMember(local,
103);
+
+ RegionVersionVector rv1 = new VMRegionVersionVector(server1);
+ rv1.updateLocalVersion(10);
+ rv1.recordVersion(server2, 1);
+ rv1.recordVersion(server2, 5);
+ rv1.recordVersion(server2, 8);
+ rv1.recordVersion(server3, 1);
+ rv1.recordVersion(server3, 3);
+ RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
+ assertTrue(singletonRVV.isForSynchronization());
+ assertEquals(singletonRVV.getOwnerId(), server1);
+ assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+ assertTrue(singletonRVV.contains(server1, 1));
+ assertTrue(singletonRVV.contains(server1, 11));
+
+ assertTrue(singletonRVV.contains(server3, 1));
+ assertTrue(singletonRVV.contains(server3, 11));
+
+ assertTrue(singletonRVV.contains(server2, 1));
+ assertTrue(singletonRVV.contains(server2, 5));
+ assertTrue(singletonRVV.contains(server2, 8));
+ assertTrue(singletonRVV.contains(server2, 2));
+ assertTrue(singletonRVV.contains(server2, 3));
+ assertTrue(singletonRVV.contains(server2, 4));
+ assertTrue(singletonRVV.contains(server2, 6));
+ assertTrue(singletonRVV.contains(server2, 7));
+ assertTrue(singletonRVV.contains(server2, 9));
+ }
+
+ /**
+ * server1 will simulate doing a sync with another server for operations
performed
+ * by server2. server3 is another server in the cluster that we don't care
about
+ * servers have version source as dist store id
+ */
+ @Test
+ public void
testSynchronizationVectorWithDiskStoreIdContainsAllVersionsForNonTarget() {
+ DiskStoreID server1 = new DiskStoreID(0, 0);
+ DiskStoreID server2 = new DiskStoreID(0, 1);
+ DiskStoreID server3 = new DiskStoreID(1, 0);
+
+ RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+ rv1.updateLocalVersion(10);
+ rv1.recordVersion(server2, 1);
+ rv1.recordVersion(server2, 5);
+ rv1.recordVersion(server2, 8);
+ rv1.recordVersion(server3, 1);
+ rv1.recordVersion(server3, 3);
+ RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
+ assertTrue(singletonRVV.isForSynchronization());
+ assertEquals(singletonRVV.getOwnerId(), server1);
+ assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+ assertTrue(singletonRVV.contains(server1, 1));
+ assertTrue(singletonRVV.contains(server1, 11));
+
+ assertTrue(singletonRVV.contains(server3, 1));
+ assertTrue(singletonRVV.contains(server3, 11));
+ }
+
+ @Test
+ public void
testSynchronizationVectorWithDiskStoreIdContainsVersionsForTarget() {
+ DiskStoreID server1 = new DiskStoreID(0, 0);
+ DiskStoreID server2 = new DiskStoreID(0, 1);
+ DiskStoreID server3 = new DiskStoreID(1, 0);
+
+ RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+ rv1.updateLocalVersion(10);
+ rv1.recordVersion(server2, 1);
+ rv1.recordVersion(server2, 5);
+ rv1.recordVersion(server2, 8);
+ rv1.recordVersion(server3, 1);
+ rv1.recordVersion(server3, 3);
+ RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server2);
+ assertTrue(singletonRVV.isForSynchronization());
+ assertEquals(singletonRVV.getOwnerId(), server1);
+ assertTrue(singletonRVV.getMemberToVersion().containsKey(server2));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+ assertTrue(singletonRVV.contains(server2, 1));
+ assertTrue(singletonRVV.contains(server2, 5));
+ assertTrue(singletonRVV.contains(server2, 8));
+
+ assertFalse(singletonRVV.contains(server2, 2));
+ assertFalse(singletonRVV.contains(server2, 3));
+ assertFalse(singletonRVV.contains(server2, 4));
+ assertFalse(singletonRVV.contains(server2, 6));
+ assertFalse(singletonRVV.contains(server2, 7));
+ assertFalse(singletonRVV.contains(server2, 9));
+ }
+ @Test
+ public void
testSynchronizationVectorWithDiskStoreIdContainsVersionsForTargetAsOriginator()
{
+ DiskStoreID server1 = new DiskStoreID(0, 0);
+ DiskStoreID server2 = new DiskStoreID(0, 1);
+ DiskStoreID server3 = new DiskStoreID(1, 0);
+
+ RegionVersionVector rv1 = new DiskRegionVersionVector(server1);
+ RegionVersionHolder localExceptions = rv1.getLocalExceptions();
+ localExceptions.addException(2, 5);
+ localExceptions.addException(7, 9);
+ rv1.updateLocalVersion(10);
+ rv1.recordVersion(server2, 1);
+ rv1.recordVersion(server2, 5);
+ rv1.recordVersion(server2, 8);
+ rv1.recordVersion(server3, 1);
+ rv1.recordVersion(server3, 3);
+ RegionVersionVector singletonRVV = rv1.getCloneForTransmission(server1);
+ assertTrue(singletonRVV.isForSynchronization());
+ assertEquals(singletonRVV.getOwnerId(), server1);
+ assertTrue(singletonRVV.getMemberToVersion().containsKey(server1));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server2));
+ assertFalse(singletonRVV.getMemberToVersion().containsKey(server3));
+
+ assertTrue(singletonRVV.contains(server1, 1));
+ assertTrue(singletonRVV.contains(server1, 2));
+ assertTrue(singletonRVV.contains(server1, 5));
+ assertTrue(singletonRVV.contains(server1, 6));
+ assertTrue(singletonRVV.contains(server1, 7));
+ assertTrue(singletonRVV.contains(server1, 9));
+ assertTrue(singletonRVV.contains(server1, 10));
+
+ assertFalse(singletonRVV.contains(server1, 3));
+ assertFalse(singletonRVV.contains(server1, 4));
+ assertFalse(singletonRVV.contains(server1, 8));
}
@Test