This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 147e6bc  GEODE-4704: Modified ConflationKey to use shadowKey when 
comparing events
147e6bc is described below

commit 147e6bc6b9009aa8e878141ef30e55145732943d
Author: Barry Oglesby <bogle...@users.noreply.github.com>
AuthorDate: Fri Feb 23 15:41:13 2018 -0800

    GEODE-4704: Modified ConflationKey to use shadowKey when comparing events
---
 .../wan/AbstractGatewaySenderEventProcessor.java   |  15 ++-
 .../internal/cache/wan/GatewaySenderEventImpl.java |   2 +-
 ...rallelGatewaySenderEventProcessorJUnitTest.java | 141 +++++++++++++++++++++
 .../wan/parallel/ParallelGatewaySenderHelper.java  |  79 ++++++++++++
 .../ParallelQueueRemovalMessageJUnitTest.java      |  26 +---
 5 files changed, 239 insertions(+), 24 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7e67e9b..9309e43 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -731,7 +731,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
     return false;
   }
 
-  private List conflate(List<GatewaySenderEventImpl> events) {
+  public List conflate(List<GatewaySenderEventImpl> events) {
     List<GatewaySenderEventImpl> conflatedEvents = null;
     // Conflate the batch if necessary
     if (this.sender.isBatchConflationEnabled() && events.size() > 1) {
@@ -756,7 +756,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
           // The event should not be conflated (create or destroy). Add it to
           // the map.
           ConflationKey key = new 
ConflationKey(gsEvent.getRegion().getFullPath(),
-              gsEvent.getKeyToConflate(), gsEvent.getOperation());
+              gsEvent.getKeyToConflate(), gsEvent.getOperation(), 
gsEvent.getShadowKey());
           conflatedEventsMap.put(key, gsEvent);
         }
       }
@@ -1321,10 +1321,17 @@ public abstract class 
AbstractGatewaySenderEventProcessor extends Thread {
 
     private String regionName;
 
+    private long shadowKey;
+
     private ConflationKey(String region, Object key, Operation operation) {
+      this(region, key, operation, -1);
+    }
+
+    private ConflationKey(String region, Object key, Operation operation, long 
shadowKey) {
       this.key = key;
       this.operation = operation;
       this.regionName = region;
+      this.shadowKey = shadowKey;
     }
 
     @Override
@@ -1334,6 +1341,7 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
       result = prime * result + key.hashCode();
       result = prime * result + operation.hashCode();
       result = prime * result + regionName.hashCode();
+      result = prime * result + Long.hashCode(this.shadowKey);
       return result;
     }
 
@@ -1358,6 +1366,9 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
       if (!this.operation.equals(that.operation)) {
         return false;
       }
+      if (this.shadowKey != that.shadowKey) {
+        return false;
+      }
       return true;
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index d314664..3a1ecd0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -757,7 +757,7 @@ public class GatewaySenderEventImpl implements AsyncEvent, 
DataSerializableFixed
         
.append(getValueAsString(true)).append(";valueIsObject=").append(this.valueIsObject)
         
.append(";numberOfParts=").append(this.numberOfParts).append(";callbackArgument=")
         
.append(this.callbackArgument).append(";possibleDuplicate=").append(this.possibleDuplicate)
-        
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
+        
.append(";creationTime=").append(this.creationTime).append(";shadowKey=")
         
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
         
.append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
new file mode 100644
index 0000000..533ed38
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ParallelGatewaySenderEventProcessorJUnitTest {
+
+  private GemFireCacheImpl cache;
+  private AbstractGatewaySender sender;
+
+  @Before
+  public void setUpGemFire() {
+    createCache();
+    createGatewaySender();
+  }
+
+  private void createCache() {
+    // Mock cache
+    this.cache = Fakes.cache();
+  }
+
+  private void createGatewaySender() {
+    // Mock gateway sender
+    this.sender = ParallelGatewaySenderHelper.createGatewaySender(this.cache);
+    when(this.sender.isBatchConflationEnabled()).thenReturn(true);
+    when(sender.getStatistics()).thenReturn(mock(GatewaySenderStats.class));
+  }
+
+  @Test
+  public void 
validateBatchConflationWithBatchContainingDuplicateConflatableEvents()
+      throws Exception {
+    // Create a ParallelGatewaySenderEventProcessor
+    AbstractGatewaySenderEventProcessor processor =
+        
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    // Create a batch of conflatable events with duplicates
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    Object lastUpdateValue = "Object_13964_5";
+    long lastUpdateSequenceId = 104, lastUpdateShadowKey = 28161;
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.CREATE,
+        "Object_13964", "Object_13964_1", 100, 27709));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.UPDATE,
+        "Object_13964", "Object_13964_2", 101, 27822));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.UPDATE,
+        "Object_13964", "Object_13964_3", 102, 27935));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.UPDATE,
+        "Object_13964", "Object_13964_4", 103, 28048));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.UPDATE,
+        "Object_13964", lastUpdateValue, lastUpdateSequenceId, 
lastUpdateShadowKey));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.DESTROY,
+        "Object_13964", null, 102, 28274));
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = 
processor.conflate(originalEvents);
+
+    // Verify:
+    // - the batch contains 3 events after conflation
+    // - they are CREATE, UPDATE, and DESTROY
+    // - the UPDATE event is the correct one
+    assertThat(conflatedEvents.size()).isEqualTo(3);
+    GatewaySenderEventImpl gsei1 = conflatedEvents.get(0);
+    assertThat(gsei1.getOperation()).isEqualTo(Operation.CREATE);
+    GatewaySenderEventImpl gsei2 = conflatedEvents.get(1);
+    assertThat(gsei2.getOperation()).isEqualTo(Operation.UPDATE);
+    GatewaySenderEventImpl gsei3 = conflatedEvents.get(2);
+    assertThat(gsei3.getOperation()).isEqualTo(Operation.DESTROY);
+    assertThat(gsei2.getDeserializedValue()).isEqualTo(lastUpdateValue);
+    
assertThat(gsei2.getEventId().getSequenceID()).isEqualTo(lastUpdateSequenceId);
+    assertThat(gsei2.getShadowKey()).isEqualTo(lastUpdateShadowKey);
+  }
+
+  @Test
+  public void 
validateBatchConflationWithBatchContainingDuplicateNonConflatableEvents()
+      throws Exception {
+    // This is a test for GEODE-4704.
+    // A batch containing events like below is conflated. The conflation code 
should not affect this
+    // batch.
+    // 
SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=100;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_13964;shadowKey=27709]
+    // 
SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=101;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_14024;shadowKey=27822]
+    // 
SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=102;bucketId=24];operation=DESTROY;region=/dataStoreRegion;key=Object_13964;shadowKey=27935]
+    // 
SenderEventImpl[id=EventIDid=57bytes;threadID=0x10018|112;sequenceID=104;bucketId=24];operation=CREATE;region=/dataStoreRegion;key=Object_14024;shadowKey=28161]
+
+    // Create a ParallelGatewaySenderEventProcessor
+    AbstractGatewaySenderEventProcessor processor =
+        
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    // Create a batch of non-conflatable events with one duplicate
+    List<GatewaySenderEventImpl> originalEvents = new ArrayList<>();
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getFullPath()).thenReturn("/dataStoreRegion");
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.CREATE,
+        "Object_13964", "Object_13964", 100, 27709));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.CREATE,
+        "Object_14024", "Object_13964", 101, 27822));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.DESTROY,
+        "Object_13964", null, 102, 27935));
+    
originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, 
Operation.CREATE,
+        "Object_14024", "Object_14024", 104, 28161));
+
+    // Conflate the batch of events
+    List<GatewaySenderEventImpl> conflatedEvents = 
processor.conflate(originalEvents);
+
+    // Assert no events were conflated incorrectly
+    assertThat(originalEvents).isEqualTo(conflatedEvents);
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
new file mode 100644
index 0000000..fb1db11
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.KeyInfo;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+
+public class ParallelGatewaySenderHelper {
+
+  public static ParallelGatewaySenderEventProcessor 
createParallelGatewaySenderEventProcessor(
+      AbstractGatewaySender sender) {
+    ParallelGatewaySenderEventProcessor processor = new 
ParallelGatewaySenderEventProcessor(sender);
+    ConcurrentParallelGatewaySenderQueue queue = new 
ConcurrentParallelGatewaySenderQueue(sender,
+        new ParallelGatewaySenderEventProcessor[] {processor});
+    Set<RegionQueue> queues = new HashSet<>();
+    queues.add(queue);
+    when(sender.getQueues()).thenReturn(queues);
+    return processor;
+  }
+
+  public static AbstractGatewaySender createGatewaySender(GemFireCacheImpl 
cache) {
+    // Mock gateway sender
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    when(sender.getCache()).thenReturn(cache);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
+    return sender;
+  }
+
+  public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion 
lr, Operation operation,
+      Object key, Object value, long sequenceId, long shadowKey) throws 
Exception {
+    when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, 
null));
+    EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, 
null, false, null);
+    eei.setEventId(new EventID(new byte[16], 1l, sequenceId));
+    GatewaySenderEventImpl gsei =
+        new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
+    gsei.setShadowKey(shadowKey);
+    return gsei;
+  }
+
+  private static EnumListenerEvent getEnumListenerEvent(Operation operation) {
+    EnumListenerEvent ele = null;
+    if (operation.isCreate()) {
+      ele = EnumListenerEvent.AFTER_CREATE;
+    } else if (operation.isUpdate()) {
+      ele = EnumListenerEvent.AFTER_UPDATE;
+    } else if (operation.isDestroy()) {
+      ele = EnumListenerEvent.AFTER_DESTROY;
+    }
+    return ele;
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 5257616..5e0f704 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -25,21 +25,17 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
@@ -64,7 +60,6 @@ import 
org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.PartitionedRegionStats;
 import org.apache.geode.internal.cache.ProxyBucketRegion;
-import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.eviction.AbstractEvictionController;
 import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
@@ -117,13 +112,10 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
   private void createGatewaySender() {
     // Mock gateway sender
-    this.sender = mock(AbstractGatewaySender.class);
+    this.sender = ParallelGatewaySenderHelper.createGatewaySender(this.cache);
     when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
     when(this.sender.getQueues()).thenReturn(null);
     when(this.sender.getDispatcherThreads()).thenReturn(1);
-    when(this.sender.getCache()).thenReturn(this.cache);
-    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
-    when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
   }
 
   private void createRootRegion() {
@@ -226,7 +218,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
     assertFalse(this.bucketRegionQueue.isInitialized());
 
     // Create a real ConcurrentParallelGatewaySenderQueue
-    ParallelGatewaySenderEventProcessor processor = 
createConcurrentParallelGatewaySenderQueue();
+    ParallelGatewaySenderEventProcessor processor =
+        
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
 
     // Add a mock GatewaySenderEventImpl to the temp queue
     BlockingQueue<GatewaySenderEventImpl> tempQueue =
@@ -248,7 +241,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
     assertEquals(0, this.bucketRegionQueue.size());
 
     // Create a real ConcurrentParallelGatewaySenderQueue
-    ParallelGatewaySenderEventProcessor processor = 
createConcurrentParallelGatewaySenderQueue();
+    ParallelGatewaySenderEventProcessor processor =
+        
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
 
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
@@ -289,16 +283,6 @@ public class ParallelQueueRemovalMessageJUnitTest {
     return regionToDispatchedKeys;
   }
 
-  private ParallelGatewaySenderEventProcessor 
createConcurrentParallelGatewaySenderQueue() {
-    ParallelGatewaySenderEventProcessor processor = new 
ParallelGatewaySenderEventProcessor(sender);
-    ConcurrentParallelGatewaySenderQueue queue = new 
ConcurrentParallelGatewaySenderQueue(sender,
-        new ParallelGatewaySenderEventProcessor[] {processor});
-    Set<RegionQueue> queues = new HashSet<>();
-    queues.add(queue);
-    when(this.sender.getQueues()).thenReturn(queues);
-    return processor;
-  }
-
   private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(
       ParallelGatewaySenderEventProcessor processor, GatewaySenderEventImpl 
event) {
     ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) 
processor.getQueue();

-- 
To stop receiving notification emails like this one, please contact
bogle...@apache.org.

Reply via email to