This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f07d749 GEODE-5890: a cleaner fix to skip calling
GatewayConflictResolver if tag has (#2673)
f07d749 is described below
commit f07d749c40068ab3de5ef4e57bf3aa300a14ba2e
Author: Xiaojian Zhou <[email protected]>
AuthorDate: Mon Oct 22 16:23:15 2018 -0700
GEODE-5890: a cleaner fix to skip calling GatewayConflictResolver if tag
has (#2673)
the same distributed system id.
---
.../cache/entries/AbstractRegionEntry.java | 10 +-
.../cache/entries/AbstractRegionEntryTest.java | 184 ++++++++++++++++++++-
2 files changed, 185 insertions(+), 9 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index fdadea0..8d8e201 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -2073,8 +2073,16 @@ public abstract class AbstractRegionEntry implements
HashRegionEntry<Object, Obj
if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) {
return true; // no timestamp received from other system - just apply it
}
- if ((tagDsid == stampDsid && tagTime > stampTime) || stampDsid == -1) {
+ // According to GatewayConflictResolver's java doc, it will only be used
on tag with different
+ // distributed system id than stamp's
+ if (stampDsid == -1) {
return true;
+ } else if (tagDsid == stampDsid) {
+ if (tagTime >= stampTime) {
+ return true;
+ } else {
+ throw new ConcurrentCacheModificationException("conflicting WAN event
detected");
+ }
}
GatewayConflictResolver resolver =
event.getRegion().getCache().getGatewayConflictResolver();
if (resolver != null) {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
index 21ed680..833293b 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
@@ -17,7 +17,11 @@ package org.apache.geode.internal.cache.entries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -29,12 +33,16 @@ import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
+import org.apache.geode.cache.util.GatewayConflictResolver;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -42,6 +50,7 @@ import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionClearedException;
import org.apache.geode.internal.cache.RegionEntryContext;
+import org.apache.geode.internal.cache.TimestampedEntryEventImpl;
import org.apache.geode.internal.cache.Token;
import
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
@@ -58,6 +67,9 @@ import
org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.Ha
public class AbstractRegionEntryTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void
whenMakeTombstoneHasSetValueThatThrowsExceptionDoesNotChangeValueToTombstone()
throws RegionClearedException {
@@ -118,10 +130,10 @@ public class AbstractRegionEntryTest {
}
}
- @Test(expected = ConcurrentCacheModificationException.class)
+ @Test
public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() {
// create 2 gateway events with the same dsid, but different timestamp
- // apply them in misorder, it should throw CME
+ // apply them in misorder, it should throw CME before calling resolver
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
LocalRegion lr = mock(LocalRegion.class);
String value = "value";
@@ -131,29 +143,34 @@ public class AbstractRegionEntryTest {
EntryEventImpl entryEvent1 = new EntryEventImpl();
entryEvent1.setRegion(lr);
when(lr.getCache()).thenReturn(cache);
- when(cache.getGatewayConflictResolver()).thenReturn(null);
+ GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
+ when(cache.getGatewayConflictResolver()).thenReturn(resolver);
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
- tag1.setDistributedSystemId(2);
+ tag1.setDistributedSystemId(3);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
tag2.setVersionTimeStamp(2);
- tag2.setDistributedSystemId(2);
+ tag2.setDistributedSystemId(3);
tag2.setIsGatewayTag(true);
((TestableRegionEntry) re).setVersions(tag2);
assertEquals(tag2.getVersionTimeStamp(),
re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+ assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag1 with smaller timestamp should throw CME
entryEvent1.setVersionTag(tag1);
+ expectedException.expect(ConcurrentCacheModificationException.class);
+ expectedException.expectMessage("conflicting WAN event detected");
re.processVersionTag(entryEvent1);
+ verify(resolver, never()).onEvent(any(), any());
}
@Test
- public void gatewayEventsFromSameDSShouldCompareTimestamp() {
+ public void gatewayEventsFromSameDSInCorrectOrderOfTimestampShouldPass() {
// create 2 gateway events with the same dsid, but different timestamp
// apply them in correct order, it should pass
GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
@@ -169,7 +186,40 @@ public class AbstractRegionEntryTest {
VersionTag tag1 = VersionTag.create(member1);
tag1.setVersionTimeStamp(1);
- tag1.setDistributedSystemId(2);
+ tag1.setDistributedSystemId(3);
+ tag1.setIsGatewayTag(true);
+
+ VersionTag tag2 = VersionTag.create(member1);
+ tag2.setVersionTimeStamp(2);
+ tag2.setDistributedSystemId(3);
+ tag2.setIsGatewayTag(true);
+
+ ((TestableRegionEntry) re).setVersions(tag1);
+ assertEquals(tag1.getVersionTimeStamp(),
+ re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+ assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
+
+ // apply tag2 should be accepted
+ entryEvent1.setVersionTag(tag2);
+ re.processVersionTag(entryEvent1);
+ }
+
+ @Test
+ public void stampWithoutDSIDShouldAcceptAnyTag() {
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ String value = "value";
+ AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+ InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+ EntryEventImpl entryEvent1 = new EntryEventImpl();
+ entryEvent1.setRegion(lr);
+ when(lr.getCache()).thenReturn(cache);
+ when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+ VersionTag tag1 = VersionTag.create(member1);
+ tag1.setVersionTimeStamp(1);
+ tag1.setDistributedSystemId(-1);
tag1.setIsGatewayTag(true);
VersionTag tag2 = VersionTag.create(member1);
@@ -180,18 +230,135 @@ public class AbstractRegionEntryTest {
((TestableRegionEntry) re).setVersions(tag1);
assertEquals(tag1.getVersionTimeStamp(),
re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+ assertEquals(-1, ((TestableRegionEntry) re).getDistributedSystemId());
// apply tag2 should be accepted
entryEvent1.setVersionTag(tag2);
re.processVersionTag(entryEvent1);
}
+ @Test
+ public void
applyingGatewayEventsFromDifferentDSShouldAcceptBiggerTimestamp() {
+ // create 2 gateway events:
+ // tag1 with smaller distributed system ids (DSIDs) and bigger timestamp
+ // tag2 with bigger DSID and smaller timestamp
+ // set tag2 into stamp. Apply event with tag1 should pass
+ // i.e. We compare timestamp first, then DSID
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ String value = "value";
+ AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+ InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+ EntryEventImpl entryEvent1 = new EntryEventImpl();
+ entryEvent1.setRegion(lr);
+ when(lr.getCache()).thenReturn(cache);
+ when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+ VersionTag tag1 = VersionTag.create(member1);
+ tag1.setVersionTimeStamp(2);
+ tag1.setDistributedSystemId(1);
+ tag1.setIsGatewayTag(true);
+
+ VersionTag tag2 = VersionTag.create(member1);
+ tag2.setVersionTimeStamp(1);
+ tag2.setDistributedSystemId(2);
+ tag2.setIsGatewayTag(true);
+
+ ((TestableRegionEntry) re).setVersions(tag2);
+ assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+ // apply tag1 with bigger timestamp should pass
+ entryEvent1.setVersionTag(tag1);
+ re.processVersionTag(entryEvent1);
+ }
+
+ @Test
+ public void
applyingGatewayEventsFromSmallerDSWithSameTimestampShouldThrowCMEIfNoResolver()
{
+ // create 2 gateway events with different distributed system ids (DSIDs),
with same timestamp
+ // set the one with bigger DSID into stamp.
+ // Apply the one with smaller DSID show throw CME
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ String value = "value";
+ AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+ InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+ EntryEventImpl entryEvent1 = new EntryEventImpl();
+ entryEvent1.setRegion(lr);
+ when(lr.getCache()).thenReturn(cache);
+ when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+ VersionTag tag1 = VersionTag.create(member1);
+ tag1.setVersionTimeStamp(1);
+ tag1.setDistributedSystemId(1);
+ tag1.setIsGatewayTag(true);
+
+ VersionTag tag2 = VersionTag.create(member1);
+ tag2.setVersionTimeStamp(1);
+ tag2.setDistributedSystemId(2);
+ tag2.setIsGatewayTag(true);
+
+ ((TestableRegionEntry) re).setVersions(tag2);
+ assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+ // apply tag1 with smaller timestamp should throw CME
+ entryEvent1.setVersionTag(tag1);
+ expectedException.expect(ConcurrentCacheModificationException.class);
+ expectedException.expectMessage("conflicting WAN event detected");
+ re.processVersionTag(entryEvent1);
+ }
+
+ @Test
+ public void resolverShouldHandleConflictEventsFromDifferentDS() {
+ // create 2 gateway events with different distributed system ids (DSIDs),
with same timestamp
+ // set the one with bigger DSID into stamp.
+ // Usually, apply the one with smaller DSID should throw CME, but since
there's resolver
+ // resolver will accept the event
+ GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ String value = "value";
+ AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+ InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+ EntryEventImpl entryEvent1 = new EntryEventImpl();
+ entryEvent1 = Mockito.spy(entryEvent1);
+ entryEvent1.setRegion(lr);
+ when(lr.getCache()).thenReturn(cache);
+ GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
+ when(cache.getGatewayConflictResolver()).thenReturn(resolver);
+ doNothing().when(resolver).onEvent(any(), any());
+ TimestampedEntryEventImpl timestampedEvent =
mock(TimestampedEntryEventImpl.class);
+ // when(entryEvent1.getTimestampedEvent(anyInt(), anyInt(), anyLong(),
anyLong()));
+ doReturn(timestampedEvent).when(entryEvent1).getTimestampedEvent(anyInt(),
anyInt(), anyLong(),
+ anyLong());
+ when(timestampedEvent.hasOldValue()).thenReturn(true);
+
+ VersionTag tag1 = VersionTag.create(member1);
+ tag1.setVersionTimeStamp(1);
+ tag1.setDistributedSystemId(1);
+ tag1.setIsGatewayTag(true);
+
+ VersionTag tag2 = VersionTag.create(member1);
+ tag2.setVersionTimeStamp(1);
+ tag2.setDistributedSystemId(2);
+ tag2.setIsGatewayTag(true);
+
+ ((TestableRegionEntry) re).setVersions(tag2);
+ assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+ entryEvent1.setVersionTag(tag1);
+ re.processVersionTag(entryEvent1);
+ verify(resolver, Mockito.times(1)).onEvent(any(), any());
+ }
+
public static class TestableRegionEntry extends AbstractRegionEntry
implements OffHeapRegionEntry, VersionStamp {
private Object value;
private VersionTag tag;
private long timeStamp = 0;
+ private int dsId;
protected TestableRegionEntry(RegionEntryContext context, Object value) {
super(context, value);
@@ -206,6 +373,7 @@ public class AbstractRegionEntryTest {
public void setVersions(VersionTag tag) {
this.tag = tag;
this.timeStamp = tag.getVersionTimeStamp();
+ this.dsId = tag.getDistributedSystemId();
}
@Override
@@ -317,7 +485,7 @@ public class AbstractRegionEntryTest {
@Override
public int getDistributedSystemId() {
- return 2;
+ return this.dsId;
}
@Override