This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-5890 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d0fb21aaae7425f18d049df6ea270fc34d8b6158 Author: zhouxh <[email protected]> AuthorDate: Wed Oct 17 17:04:30 2018 -0700 GEODE-5890: gateway events from the same distributed system should check misorder --- .../cache/entries/AbstractRegionEntry.java | 2 +- .../cache/entries/AbstractRegionEntryTest.java | 145 ++++++++++++++++++++- 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java index 81fee86..fdadea0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java @@ -2073,7 +2073,7 @@ public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Obj if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) { return true; // no timestamp received from other system - just apply it } - if (tagDsid == stampDsid || stampDsid == -1) { + if ((tagDsid == stampDsid && tagTime > stampTime) || stampDsid == -1) { return true; } GatewayConflictResolver resolver = event.getRegion().getCache().getGatewayConflictResolver(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java index b1684ac..21ed680 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java @@ -35,13 +35,18 @@ import org.apache.geode.DataSerializer; import org.apache.geode.cache.query.QueryException; import org.apache.geode.cache.query.internal.index.IndexManager; import org.apache.geode.cache.query.internal.index.IndexProtocol; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalRegion; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.RegionClearedException; import org.apache.geode.internal.cache.RegionEntryContext; import org.apache.geode.internal.cache.Token; +import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; import org.apache.geode.internal.cache.versions.RegionVersionVector; +import org.apache.geode.internal.cache.versions.VersionSource; +import org.apache.geode.internal.cache.versions.VersionStamp; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.offheap.MemoryAllocatorImpl; import org.apache.geode.internal.offheap.OffHeapMemoryStats; @@ -113,16 +118,119 @@ public class AbstractRegionEntryTest { } } + @Test(expected = ConcurrentCacheModificationException.class) + public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() { + // create 2 gateway events with the same dsid, but different timestamp + // apply them in misorder, it should throw CME + GemFireCacheImpl cache = mock(GemFireCacheImpl.class); + LocalRegion lr = mock(LocalRegion.class); + String value = "value"; + AbstractRegionEntry re = new TestableRegionEntry(lr, value); + InternalDistributedMember member1 = mock(InternalDistributedMember.class); + + EntryEventImpl entryEvent1 = new EntryEventImpl(); + entryEvent1.setRegion(lr); + when(lr.getCache()).thenReturn(cache); + when(cache.getGatewayConflictResolver()).thenReturn(null); + + VersionTag tag1 = VersionTag.create(member1); + tag1.setVersionTimeStamp(1); + tag1.setDistributedSystemId(2); + tag1.setIsGatewayTag(true); + + VersionTag tag2 = VersionTag.create(member1); + tag2.setVersionTimeStamp(2); + tag2.setDistributedSystemId(2); + tag2.setIsGatewayTag(true); + + ((TestableRegionEntry) re).setVersions(tag2); + assertEquals(tag2.getVersionTimeStamp(), + re.getVersionStamp().asVersionTag().getVersionTimeStamp()); + + // apply tag1 with smaller timestamp should throw CME + entryEvent1.setVersionTag(tag1); + re.processVersionTag(entryEvent1); + } + + @Test + public void gatewayEventsFromSameDSShouldCompareTimestamp() { + // create 2 gateway events with the same dsid, but different timestamp + // apply them in correct order, it should pass + GemFireCacheImpl cache = mock(GemFireCacheImpl.class); + LocalRegion lr = mock(LocalRegion.class); + String value = "value"; + AbstractRegionEntry re = new TestableRegionEntry(lr, value); + InternalDistributedMember member1 = mock(InternalDistributedMember.class); + + EntryEventImpl entryEvent1 = new EntryEventImpl(); + entryEvent1.setRegion(lr); + when(lr.getCache()).thenReturn(cache); + when(cache.getGatewayConflictResolver()).thenReturn(null); + + VersionTag tag1 = VersionTag.create(member1); + tag1.setVersionTimeStamp(1); + tag1.setDistributedSystemId(2); + tag1.setIsGatewayTag(true); + + VersionTag tag2 = VersionTag.create(member1); + tag2.setVersionTimeStamp(2); + tag2.setDistributedSystemId(2); + tag2.setIsGatewayTag(true); + + ((TestableRegionEntry) re).setVersions(tag1); + assertEquals(tag1.getVersionTimeStamp(), + re.getVersionStamp().asVersionTag().getVersionTimeStamp()); + + // apply tag2 should be accepted + entryEvent1.setVersionTag(tag2); + re.processVersionTag(entryEvent1); + } + public static class TestableRegionEntry extends AbstractRegionEntry - implements OffHeapRegionEntry { + implements OffHeapRegionEntry, VersionStamp { private Object value; + private VersionTag tag; + private long timeStamp = 0; protected TestableRegionEntry(RegionEntryContext context, Object value) { super(context, value); } @Override + public void setVersionTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + @Override + public void setVersions(VersionTag tag) { + this.tag = tag; + this.timeStamp = tag.getVersionTimeStamp(); + } + + @Override + public void setMemberID(VersionSource memberID) { + + } + + @Override + public VersionTag asVersionTag() { + return tag; + } + + @Override + public void processVersionTag(InternalRegion region, VersionTag tag, boolean isTombstoneFromGII, + boolean hasDelta, VersionSource versionSource, + InternalDistributedMember sender, boolean checkConflicts) { + + } + + @Override + public VersionStamp getVersionStamp() { + return this; + } + + @Override protected Object getValueField() { return this.value; } @@ -186,5 +294,40 @@ public class AbstractRegionEntryTest { public boolean setAddress(long expectedAddr, long newAddr) { return false; } + + @Override + public int getEntryVersion() { + return 0; + } + + @Override + public long getRegionVersion() { + return 0; + } + + @Override + public long getVersionTimeStamp() { + return this.timeStamp; + } + + @Override + public VersionSource getMemberID() { + return null; + } + + @Override + public int getDistributedSystemId() { + return 2; + } + + @Override + public short getRegionVersionHighBytes() { + return 0; + } + + @Override + public int getRegionVersionLowBytes() { + return 0; + } } }
