This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-5890
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d0fb21aaae7425f18d049df6ea270fc34d8b6158
Author: zhouxh <[email protected]>
AuthorDate: Wed Oct 17 17:04:30 2018 -0700

    GEODE-5890: gateway events from the same distributed system should check 
misorder
---
 .../cache/entries/AbstractRegionEntry.java         |   2 +-
 .../cache/entries/AbstractRegionEntryTest.java     | 145 ++++++++++++++++++++-
 2 files changed, 145 insertions(+), 2 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index 81fee86..fdadea0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -2073,7 +2073,7 @@ public abstract class AbstractRegionEntry implements 
HashRegionEntry<Object, Obj
     if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) {
       return true; // no timestamp received from other system - just apply it
     }
-    if (tagDsid == stampDsid || stampDsid == -1) {
+    if ((tagDsid == stampDsid && tagTime > stampTime) || stampDsid == -1) {
       return true;
     }
     GatewayConflictResolver resolver = 
event.getRegion().getCache().getGatewayConflictResolver();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
index b1684ac..21ed680 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
@@ -35,13 +35,18 @@ import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.cache.query.internal.index.IndexProtocol;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionClearedException;
 import org.apache.geode.internal.cache.RegionEntryContext;
 import org.apache.geode.internal.cache.Token;
+import 
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
 import org.apache.geode.internal.offheap.OffHeapMemoryStats;
@@ -113,16 +118,119 @@ public class AbstractRegionEntryTest {
     }
   }
 
+  @Test(expected = ConcurrentCacheModificationException.class)
+  public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() {
+    // create 2 gateway events with the same dsid, but different timestamp
+    // apply them in misorder, it should throw CME
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(1);
+    tag1.setDistributedSystemId(2);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(2);
+    tag2.setDistributedSystemId(2);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag2);
+    assertEquals(tag2.getVersionTimeStamp(),
+        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+
+    // apply tag1 with smaller timestamp should throw CME
+    entryEvent1.setVersionTag(tag1);
+    re.processVersionTag(entryEvent1);
+  }
+
+  @Test
+  public void gatewayEventsFromSameDSShouldCompareTimestamp() {
+    // create 2 gateway events with the same dsid, but different timestamp
+    // apply them in correct order, it should pass
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(1);
+    tag1.setDistributedSystemId(2);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(2);
+    tag2.setDistributedSystemId(2);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag1);
+    assertEquals(tag1.getVersionTimeStamp(),
+        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+
+    // apply tag2 should be accepted
+    entryEvent1.setVersionTag(tag2);
+    re.processVersionTag(entryEvent1);
+  }
+
   public static class TestableRegionEntry extends AbstractRegionEntry
-      implements OffHeapRegionEntry {
+      implements OffHeapRegionEntry, VersionStamp {
 
     private Object value;
+    private VersionTag tag;
+    private long timeStamp = 0;
 
     protected TestableRegionEntry(RegionEntryContext context, Object value) {
       super(context, value);
     }
 
     @Override
+    public void setVersionTimeStamp(long timeStamp) {
+      this.timeStamp = timeStamp;
+    }
+
+    @Override
+    public void setVersions(VersionTag tag) {
+      this.tag = tag;
+      this.timeStamp = tag.getVersionTimeStamp();
+    }
+
+    @Override
+    public void setMemberID(VersionSource memberID) {
+
+    }
+
+    @Override
+    public VersionTag asVersionTag() {
+      return tag;
+    }
+
+    @Override
+    public void processVersionTag(InternalRegion region, VersionTag tag, 
boolean isTombstoneFromGII,
+        boolean hasDelta, VersionSource versionSource,
+        InternalDistributedMember sender, boolean checkConflicts) {
+
+    }
+
+    @Override
+    public VersionStamp getVersionStamp() {
+      return this;
+    }
+
+    @Override
     protected Object getValueField() {
       return this.value;
     }
@@ -186,5 +294,40 @@ public class AbstractRegionEntryTest {
     public boolean setAddress(long expectedAddr, long newAddr) {
       return false;
     }
+
+    @Override
+    public int getEntryVersion() {
+      return 0;
+    }
+
+    @Override
+    public long getRegionVersion() {
+      return 0;
+    }
+
+    @Override
+    public long getVersionTimeStamp() {
+      return this.timeStamp;
+    }
+
+    @Override
+    public VersionSource getMemberID() {
+      return null;
+    }
+
+    @Override
+    public int getDistributedSystemId() {
+      return 2;
+    }
+
+    @Override
+    public short getRegionVersionHighBytes() {
+      return 0;
+    }
+
+    @Override
+    public int getRegionVersionLowBytes() {
+      return 0;
+    }
   }
 }

Reply via email to