This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-6854 in repository https://gitbox.apache.org/repos/asf/geode.git
commit babc6507f3592a7eb62cf7ac0ade0d2e75bee76f Author: Barry Oglesby <[email protected]> AuthorDate: Tue Jun 11 09:41:19 2019 -0700 GEODE-6854: Skipped events already contained in the batch during conflation --- .../wan/AbstractGatewaySenderEventProcessor.java | 30 ++++++++-- .../internal/cache/wan/GatewaySenderEventImpl.java | 43 ++++++++++++++ ...rallelGatewaySenderEventProcessorJUnitTest.java | 69 +++++++++++++++++++++- .../wan/parallel/ParallelGatewaySenderHelper.java | 8 ++- 4 files changed, 142 insertions(+), 8 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 87d3021..058a1b7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -772,6 +772,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread List<GatewaySenderEventImpl> conflatedEvents = null; // Conflate the batch if necessary if (this.sender.isBatchConflationEnabled() && events.size() > 1) { + if (logger.isDebugEnabled()) { + logEvents("original", events); + } Map<ConflationKey, GatewaySenderEventImpl> conflatedEventsMap = new LinkedHashMap<ConflationKey, GatewaySenderEventImpl>(); conflatedEvents = new ArrayList<GatewaySenderEventImpl>(); @@ -783,12 +786,16 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(), gsEvent.getKeyToConflate(), gsEvent.getOperation()); - // Attempt to remove the key. If the entry is removed, that means a - // duplicate key was found. If not, this is a no-op. - conflatedEventsMap.remove(key); + // Get the entry at that key + GatewaySenderEventImpl existingEvent = conflatedEventsMap.get(key); + if (!gsEvent.equals(existingEvent)) { + // Attempt to remove the key. If the entry is removed, that means a + // duplicate key was found. If not, this is a no-op. + conflatedEventsMap.remove(key); - // Add the key to the end of the map. - conflatedEventsMap.put(key, gsEvent); + // Add the key to the end of the map. + conflatedEventsMap.put(key, gsEvent); + } } else { // The event should not be conflated (create or destroy). Add it to // the map. @@ -806,12 +813,25 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread // Increment the events conflated from batches statistic this.sender.getStatistics() .incEventsConflatedFromBatches(events.size() - conflatedEvents.size()); + if (logger.isDebugEnabled()) { + logEvents("conflated", conflatedEvents); + } } else { conflatedEvents = events; } return conflatedEvents; } + private void logEvents(String message, List<GatewaySenderEventImpl> events) { + StringBuilder builder = new StringBuilder(); + builder.append("The batch contains the following ").append(events.size()).append(" ") + .append(message).append(" events:"); + for (GatewaySenderEventImpl event : events) { + builder.append("\t\n").append(event.toSmallString()); + } + logger.debug(builder); + } + private List<GatewaySenderEventImpl> addPDXEvent() throws IOException { List<GatewaySenderEventImpl> pdxEventsToBeDispatched = new ArrayList<GatewaySenderEventImpl>(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 8db48b4..1247d03 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -19,6 +19,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import org.apache.geode.DataSerializer; import org.apache.geode.InternalGemFireError; @@ -44,6 +45,7 @@ import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.Token; import org.apache.geode.internal.cache.WrappedCallbackArgument; import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.lang.ObjectUtils; import org.apache.geode.internal.offheap.OffHeapHelper; import org.apache.geode.internal.offheap.ReferenceCountHelper; import org.apache.geode.internal.offheap.Releasable; @@ -774,6 +776,15 @@ public class GatewaySenderEventImpl return buffer.toString(); } + public String toSmallString() { + StringBuffer buffer = new StringBuffer(); + buffer.append("GatewaySenderEventImpl[").append("id=").append(this.id).append(";operation=") + .append(getOperation()).append(";region=").append(this.regionPath).append(";key=") + .append(this.key).append(";shadowKey=").append(this.shadowKey).append(";bucketId=") + .append(this.bucketId).append("]"); + return buffer.toString(); + } + public static boolean isSerializingValue() { return ((Boolean) isSerializingValue.get()).booleanValue(); } @@ -1179,6 +1190,38 @@ public class GatewaySenderEventImpl return this.shadowKey; } + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || !(obj instanceof GatewaySenderEventImpl)) { + return false; + } + + GatewaySenderEventImpl that = (GatewaySenderEventImpl) obj; + + return this.shadowKey.equals(that.shadowKey) + && this.id.equals(that.id) + && this.bucketId == that.bucketId + && this.action == that.action + && this.regionPath.equals(that.regionPath) + && this.key.equals(that.key) + && Arrays.equals(this.value, that.value); + } + + public int hashCode() { + int hashCode = 17; + hashCode = 37 * hashCode + ObjectUtils.hashCode(this.shadowKey); + hashCode = 37 * hashCode + ObjectUtils.hashCode(this.id); + hashCode = 37 * hashCode + this.bucketId; + hashCode = 37 * hashCode + this.action; + hashCode = 37 * hashCode + ObjectUtils.hashCode(this.regionPath); + hashCode = 37 * hashCode + ObjectUtils.hashCode(this.key); + hashCode = 37 * hashCode + (this.value == null ? 0 : Arrays.hashCode(this.value)); + return hashCode; + } + @Override public Version[] getSerializationVersions() { return new Version[] {Version.GEODE_1_9_0}; diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java index 558df5c..1eb9920 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java @@ -125,7 +125,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest { AbstractGatewaySenderEventProcessor processor = ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); - // Create a batch of non-conflatable events with one duplicate + // Create a batch of non-conflatable events with one duplicate (not including the shadowKey) List<GatewaySenderEventImpl> originalEvents = new ArrayList<>(); LocalRegion lr = mock(LocalRegion.class); when(lr.getFullPath()).thenReturn("/dataStoreRegion"); @@ -139,7 +139,7 @@ public class ParallelGatewaySenderEventProcessorJUnitTest { originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE, "Object_13964", "Object_13964", 100, 27709)); originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE, - "Object_14024", "Object_13964", 101, 27822)); + "Object_14024", "Object_14024", 101, 27822)); originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY, "Object_13964", null, 102, 27935)); originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE, @@ -151,4 +151,69 @@ public class ParallelGatewaySenderEventProcessorJUnitTest { // Assert no events were conflated incorrectly assertThat(originalEvents).isEqualTo(conflatedEvents); } + + @Test + public void validateBatchConflationWithBatchContainingDuplicateConflatableAndNonConflatableEvents() + throws Exception { + // This is a test for GEODE-6854. + // A batch containing events like below is conflated incorrectly. The conflation code should + // remove the duplicates from this batch, but not change their order. + // SenderEventImpl[id=EventID[threadID=104;sequenceID=2;bucketId=89];action=1;operation=UPDATE;region=/dataStoreRegion;key=Object_6079;shadowKey=16587] + // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700] + // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813] + // SenderEventImpl[id=EventID[threadID=112;sequenceID=12;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_6591;shadowKey=16926] + // SenderEventImpl[id=EventID[threadID=104;sequenceID=3;bucketId=89];action=2;operation=DESTROY;region=/dataStoreRegion;key=Object_6079;shadowKey=16700] + // SenderEventImpl[id=EventID[threadID=112;sequenceID=9;bucketId=89];action=1;operation=PUTALL_UPDATE;region=/dataStoreRegion;key=Object_7731;shadowKey=16813] + + // Create a ParallelGatewaySenderEventProcessor + AbstractGatewaySenderEventProcessor processor = + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); + + // Create mock region + LocalRegion lr = mock(LocalRegion.class); + when(lr.getFullPath()).thenReturn("/dataStoreRegion"); + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + when(lr.getCache()).thenReturn(cache); + when(cache.getDistributedSystem()).thenReturn(ids); + + // Create a batch of conflatable and non-conflatable events with one duplicate conflatable event + // and one duplicate non-conflatable event (including the shadowKey) + List<GatewaySenderEventImpl> originalEvents = new ArrayList<>(); + originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE, + "Object_6079", "Object_6079", 104, 2, 89, 16587)); + originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY, + "Object_6079", null, 104, 3, 89, 16700)); + originalEvents + .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE, + "Object_7731", "Object_7731", 112, 9, 89, 16813)); + originalEvents + .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE, + "Object_6591", "Object_6591", 112, 12, 89, 16926)); + originalEvents.add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.DESTROY, + "Object_6079", null, 104, 3, 89, 16700)); + originalEvents + .add(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.PUTALL_UPDATE, + "Object_7731", "Object_7731", 112, 9, 89, 16813)); + logEvents("original", originalEvents); + + // Conflate the batch of events + List<GatewaySenderEventImpl> conflatedEvents = processor.conflate(originalEvents); + logEvents("conflated", conflatedEvents); + assertThat(conflatedEvents.size()).isEqualTo(4); + assertThat(originalEvents.get(0)).isEqualTo(conflatedEvents.get(0)); + assertThat(originalEvents.get(1)).isEqualTo(conflatedEvents.get(1)); + assertThat(originalEvents.get(2)).isEqualTo(conflatedEvents.get(2)); + assertThat(originalEvents.get(3)).isEqualTo(conflatedEvents.get(3)); + } + + private void logEvents(String message, List<GatewaySenderEventImpl> events) { + StringBuilder builder = new StringBuilder(); + builder.append("The list contains the following ").append(events.size()).append(" ") + .append(message).append(" events:"); + for (GatewaySenderEventImpl event : events) { + builder.append("\t\n").append(event.toSmallString()); + } + System.out.println(builder); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java index 53529af..e75b9cf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java @@ -81,9 +81,15 @@ public class ParallelGatewaySenderHelper { public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation, Object key, Object value, long sequenceId, long shadowKey) throws Exception { + return createGatewaySenderEvent(lr, operation, key, value, 1l, sequenceId, 0, shadowKey); + } + + public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operation operation, + Object key, Object value, long threadId, long sequenceId, int bucketId, long shadowKey) + throws Exception { when(lr.getKeyInfo(key, value, null)).thenReturn(new KeyInfo(key, null, null)); EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null); - eei.setEventId(new EventID(new byte[16], 1l, sequenceId)); + eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId)); GatewaySenderEventImpl gsei = new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null); gsei.setShadowKey(shadowKey);
