This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEM-2228-883 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9cb0131de13bc6a4987875f5a5efbbb05e73d988 Author: zhouxh <[email protected]> AuthorDate: Wed Sep 26 14:45:22 2018 -0700 GEM-2228-883: The previous fix for GEODE-3967 caused an event with CME is dispatched. This has a side effect of data inconsistency of WAN. The enhanced fix is to enqueue the CME event but not to dispatch it. --- .../codeAnalysis/sanctionedDataSerializables.txt | 8 ++++-- .../wan/AbstractGatewaySenderEventProcessor.java | 14 ++++++++++ .../internal/cache/wan/GatewaySenderEventImpl.java | 32 ++++++++++++++++++++-- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index f5b8733..c599221 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1984,9 +1984,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2 fromData,63 toData,87 -org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2 -fromData,183 -toData,133 +org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4 +fromData,17 +fromDataPre_GEODE_1_8_0_0,183 +toData,17 +toDataPre_GEODE_1_8_0_0,133 org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2 fromData,20 diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index ea2f603..71cdac2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -562,6 +562,20 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread } } } + + // filter out the events with CME + Iterator<GatewaySenderEventImpl> cmeItr = filteredList.iterator(); + while (cmeItr.hasNext()) { + GatewaySenderEventImpl event = cmeItr.next(); + if (event.isConcurrencyConflict()) { + cmeItr.remove(); + logger.debug("The CME event: {} is removed from Gateway Sender queue: {}", event, + sender); + statistics.incEventsNotQueued(); + continue; + } + } + /* * if (filteredList.isEmpty()) { eventQueueRemove(events.size()); continue; } */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 7c74957..6ac2275 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.VersionedDataInputStream; +import org.apache.geode.internal.VersionedDataSerializable; import org.apache.geode.internal.cache.CachedDeserializable; import org.apache.geode.internal.cache.CachedDeserializableFactory; import org.apache.geode.internal.cache.Conflatable; @@ -62,7 +63,8 @@ import org.apache.geode.internal.size.Sizeable; * */ public class GatewaySenderEventImpl - implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable { + implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable, + VersionedDataSerializable { private static final long serialVersionUID = -5690172020872255422L; protected static final Object TOKEN_NULL = new Object(); @@ -171,6 +173,8 @@ public class GatewaySenderEventImpl protected boolean isInitialized; + private transient boolean isConcurrencyConflict = false; + /** * Is this thread in the process of serializing this event? */ @@ -310,6 +314,7 @@ public class GatewaySenderEventImpl if (initialize) { initialize(); } + this.isConcurrencyConflict = event.isConcurrencyConflict(); } /** @@ -671,7 +676,13 @@ public class GatewaySenderEventImpl return GATEWAY_SENDER_EVENT_IMPL; } + @Override public void toData(DataOutput out) throws IOException { + toDataPre_GEODE_1_8_0_0(out); + DataSerializer.writeBoolean(this.isConcurrencyConflict, out); + } + + public void toDataPre_GEODE_1_8_0_0(DataOutput out) throws IOException { // Make sure we are initialized before we serialize. initialize(); out.writeShort(VERSION); @@ -695,7 +706,13 @@ public class GatewaySenderEventImpl DataSerializer.writeObject(this.key, out); } + @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { + fromDataPre_GEODE_1_8_0_0(in); + this.isConcurrencyConflict = DataSerializer.readBoolean(in); + } + + public void fromDataPre_GEODE_1_8_0_0(DataInput in) throws IOException, ClassNotFoundException { short version = in.readShort(); if (version != VERSION) { // warning?` @@ -742,7 +759,8 @@ public class GatewaySenderEventImpl .append(";creationTime=").append(this.creationTime).append(";shadowKey=") .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp) .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched) - .append(";bucketId=").append(this.bucketId).append("]"); + .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=") + .append(this.isConcurrencyConflict).append("]"); return buffer.toString(); } @@ -1125,6 +1143,14 @@ public class GatewaySenderEventImpl return bucketId; } + public boolean isConcurrencyConflict() { + return isConcurrencyConflict; + } + + public boolean setConcurrencyConflict(boolean isConcurrencyConflict) { + return this.isConcurrencyConflict = isConcurrencyConflict; + } + /** * @param tailKey the tailKey to set */ @@ -1141,7 +1167,7 @@ public class GatewaySenderEventImpl @Override public Version[] getSerializationVersions() { - return null; + return new Version[] {Version.GEODE_180}; } public int getSerializedValueSize() {
