This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f07d749  GEODE-5890: a cleaner fix to skip calling 
GatewayConflictResolver if tag has (#2673)
f07d749 is described below

commit f07d749c40068ab3de5ef4e57bf3aa300a14ba2e
Author: Xiaojian Zhou <[email protected]>
AuthorDate: Mon Oct 22 16:23:15 2018 -0700

    GEODE-5890: a cleaner fix to skip calling GatewayConflictResolver if tag 
has (#2673)
    
    the same distributed system id.
---
 .../cache/entries/AbstractRegionEntry.java         |  10 +-
 .../cache/entries/AbstractRegionEntryTest.java     | 184 ++++++++++++++++++++-
 2 files changed, 185 insertions(+), 9 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index fdadea0..8d8e201 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -2073,8 +2073,16 @@ public abstract class AbstractRegionEntry implements 
HashRegionEntry<Object, Obj
     if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) {
       return true; // no timestamp received from other system - just apply it
     }
-    if ((tagDsid == stampDsid && tagTime > stampTime) || stampDsid == -1) {
+    // According to GatewayConflictResolver's java doc, it will only be used 
on tag with different
+    // distributed system id than stamp's
+    if (stampDsid == -1) {
       return true;
+    } else if (tagDsid == stampDsid) {
+      if (tagTime >= stampTime) {
+        return true;
+      } else {
+        throw new ConcurrentCacheModificationException("conflicting WAN event 
detected");
+      }
     }
     GatewayConflictResolver resolver = 
event.getRegion().getCache().getGatewayConflictResolver();
     if (resolver != null) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
index 21ed680..833293b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/AbstractRegionEntryTest.java
@@ -17,7 +17,11 @@ package org.apache.geode.internal.cache.entries;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -29,12 +33,16 @@ import java.io.IOException;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.cache.query.internal.index.IndexProtocol;
+import org.apache.geode.cache.util.GatewayConflictResolver;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -42,6 +50,7 @@ import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionClearedException;
 import org.apache.geode.internal.cache.RegionEntryContext;
+import org.apache.geode.internal.cache.TimestampedEntryEventImpl;
 import org.apache.geode.internal.cache.Token;
 import 
org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
@@ -58,6 +67,9 @@ import 
org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.Ha
 
 public class AbstractRegionEntryTest {
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void 
whenMakeTombstoneHasSetValueThatThrowsExceptionDoesNotChangeValueToTombstone()
       throws RegionClearedException {
@@ -118,10 +130,10 @@ public class AbstractRegionEntryTest {
     }
   }
 
-  @Test(expected = ConcurrentCacheModificationException.class)
+  @Test
   public void gatewayEventsFromSameDSShouldThrowCMEIfMisordered() {
     // create 2 gateway events with the same dsid, but different timestamp
-    // apply them in misorder, it should throw CME
+    // apply them in misorder, it should throw CME before calling resolver
     GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
     LocalRegion lr = mock(LocalRegion.class);
     String value = "value";
@@ -131,29 +143,34 @@ public class AbstractRegionEntryTest {
     EntryEventImpl entryEvent1 = new EntryEventImpl();
     entryEvent1.setRegion(lr);
     when(lr.getCache()).thenReturn(cache);
-    when(cache.getGatewayConflictResolver()).thenReturn(null);
+    GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
+    when(cache.getGatewayConflictResolver()).thenReturn(resolver);
 
     VersionTag tag1 = VersionTag.create(member1);
     tag1.setVersionTimeStamp(1);
-    tag1.setDistributedSystemId(2);
+    tag1.setDistributedSystemId(3);
     tag1.setIsGatewayTag(true);
 
     VersionTag tag2 = VersionTag.create(member1);
     tag2.setVersionTimeStamp(2);
-    tag2.setDistributedSystemId(2);
+    tag2.setDistributedSystemId(3);
     tag2.setIsGatewayTag(true);
 
     ((TestableRegionEntry) re).setVersions(tag2);
     assertEquals(tag2.getVersionTimeStamp(),
         re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+    assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
 
     // apply tag1 with smaller timestamp should throw CME
     entryEvent1.setVersionTag(tag1);
+    expectedException.expect(ConcurrentCacheModificationException.class);
+    expectedException.expectMessage("conflicting WAN event detected");
     re.processVersionTag(entryEvent1);
+    verify(resolver, never()).onEvent(any(), any());
   }
 
   @Test
-  public void gatewayEventsFromSameDSShouldCompareTimestamp() {
+  public void gatewayEventsFromSameDSInCorrectOrderOfTimestampShouldPass() {
     // create 2 gateway events with the same dsid, but different timestamp
     // apply them in correct order, it should pass
     GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
@@ -169,7 +186,40 @@ public class AbstractRegionEntryTest {
 
     VersionTag tag1 = VersionTag.create(member1);
     tag1.setVersionTimeStamp(1);
-    tag1.setDistributedSystemId(2);
+    tag1.setDistributedSystemId(3);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(2);
+    tag2.setDistributedSystemId(3);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag1);
+    assertEquals(tag1.getVersionTimeStamp(),
+        re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+    assertEquals(3, ((TestableRegionEntry) re).getDistributedSystemId());
+
+    // apply tag2 should be accepted
+    entryEvent1.setVersionTag(tag2);
+    re.processVersionTag(entryEvent1);
+  }
+
+  @Test
+  public void stampWithoutDSIDShouldAcceptAnyTag() {
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(1);
+    tag1.setDistributedSystemId(-1);
     tag1.setIsGatewayTag(true);
 
     VersionTag tag2 = VersionTag.create(member1);
@@ -180,18 +230,135 @@ public class AbstractRegionEntryTest {
     ((TestableRegionEntry) re).setVersions(tag1);
     assertEquals(tag1.getVersionTimeStamp(),
         re.getVersionStamp().asVersionTag().getVersionTimeStamp());
+    assertEquals(-1, ((TestableRegionEntry) re).getDistributedSystemId());
 
     // apply tag2 should be accepted
     entryEvent1.setVersionTag(tag2);
     re.processVersionTag(entryEvent1);
   }
 
+  @Test
+  public void 
applyingGatewayEventsFromDifferentDSShouldAcceptBiggerTimestamp() {
+    // create 2 gateway events:
+    // tag1 with smaller distributed system ids (DSIDs) and bigger timestamp
+    // tag2 with bigger DSID and smaller timestamp
+    // set tag2 into stamp. Apply event with tag1 should pass
+    // i.e. We compare timestamp first, then DSID
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(2);
+    tag1.setDistributedSystemId(1);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(1);
+    tag2.setDistributedSystemId(2);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag2);
+    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+    // apply tag1 with bigger timestamp should pass
+    entryEvent1.setVersionTag(tag1);
+    re.processVersionTag(entryEvent1);
+  }
+
+  @Test
+  public void 
applyingGatewayEventsFromSmallerDSWithSameTimestampShouldThrowCMEIfNoResolver() 
{
+    // create 2 gateway events with different distributed system ids (DSIDs), 
with same timestamp
+    // set the one with bigger DSID into stamp.
+    // Apply the one with smaller DSID show throw CME
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    when(cache.getGatewayConflictResolver()).thenReturn(null);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(1);
+    tag1.setDistributedSystemId(1);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(1);
+    tag2.setDistributedSystemId(2);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag2);
+    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+    // apply tag1 with smaller timestamp should throw CME
+    entryEvent1.setVersionTag(tag1);
+    expectedException.expect(ConcurrentCacheModificationException.class);
+    expectedException.expectMessage("conflicting WAN event detected");
+    re.processVersionTag(entryEvent1);
+  }
+
+  @Test
+  public void resolverShouldHandleConflictEventsFromDifferentDS() {
+    // create 2 gateway events with different distributed system ids (DSIDs), 
with same timestamp
+    // set the one with bigger DSID into stamp.
+    // Usually, apply the one with smaller DSID should throw CME, but since 
there's resolver
+    // resolver will accept the event
+    GemFireCacheImpl cache = mock(GemFireCacheImpl.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    String value = "value";
+    AbstractRegionEntry re = new TestableRegionEntry(lr, value);
+    InternalDistributedMember member1 = mock(InternalDistributedMember.class);
+
+    EntryEventImpl entryEvent1 = new EntryEventImpl();
+    entryEvent1 = Mockito.spy(entryEvent1);
+    entryEvent1.setRegion(lr);
+    when(lr.getCache()).thenReturn(cache);
+    GatewayConflictResolver resolver = mock(GatewayConflictResolver.class);
+    when(cache.getGatewayConflictResolver()).thenReturn(resolver);
+    doNothing().when(resolver).onEvent(any(), any());
+    TimestampedEntryEventImpl timestampedEvent = 
mock(TimestampedEntryEventImpl.class);
+    // when(entryEvent1.getTimestampedEvent(anyInt(), anyInt(), anyLong(), 
anyLong()));
+    doReturn(timestampedEvent).when(entryEvent1).getTimestampedEvent(anyInt(), 
anyInt(), anyLong(),
+        anyLong());
+    when(timestampedEvent.hasOldValue()).thenReturn(true);
+
+    VersionTag tag1 = VersionTag.create(member1);
+    tag1.setVersionTimeStamp(1);
+    tag1.setDistributedSystemId(1);
+    tag1.setIsGatewayTag(true);
+
+    VersionTag tag2 = VersionTag.create(member1);
+    tag2.setVersionTimeStamp(1);
+    tag2.setDistributedSystemId(2);
+    tag2.setIsGatewayTag(true);
+
+    ((TestableRegionEntry) re).setVersions(tag2);
+    assertEquals(2, ((TestableRegionEntry) re).getDistributedSystemId());
+
+    entryEvent1.setVersionTag(tag1);
+    re.processVersionTag(entryEvent1);
+    verify(resolver, Mockito.times(1)).onEvent(any(), any());
+  }
+
   public static class TestableRegionEntry extends AbstractRegionEntry
       implements OffHeapRegionEntry, VersionStamp {
 
     private Object value;
     private VersionTag tag;
     private long timeStamp = 0;
+    private int dsId;
 
     protected TestableRegionEntry(RegionEntryContext context, Object value) {
       super(context, value);
@@ -206,6 +373,7 @@ public class AbstractRegionEntryTest {
     public void setVersions(VersionTag tag) {
       this.tag = tag;
       this.timeStamp = tag.getVersionTimeStamp();
+      this.dsId = tag.getDistributedSystemId();
     }
 
     @Override
@@ -317,7 +485,7 @@ public class AbstractRegionEntryTest {
 
     @Override
     public int getDistributedSystemId() {
-      return 2;
+      return this.dsId;
     }
 
     @Override

Reply via email to