This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new de7c6d8 GEODE-3967: There're following 9 problems fixed here: 1) When
ConcurrentCacheModificationException happened, GatewaySenderEventImpl should
save the status and notify gatewaysender if it hold primary queue, because
other member might have put the event into the secondary queue. 2) In
AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd try
should allow both create and update. 3) Let event with CME not to dispatch. The
old logic does not allow CME eve [...]
de7c6d8 is described below
commit de7c6d8b4a9b3e2c1c0ebd4ce1835aff0007f9e1
Author: zhouxh <[email protected]>
AuthorDate: Sat Jan 20 17:39:00 2018 -0800
GEODE-3967: There're following 9 problems fixed here:
1) When ConcurrentCacheModificationException happened,
GatewaySenderEventImpl
should save the status and notify gatewaysender if it hold primary queue,
because other member might have put the event into the secondary queue.
2) In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the
3rd try should allow both create and update.
3) Let event with CME not to dispatch. The old logic does not allow CME
event
to enqueue. This is wrong, because an event without CME might have been
added into the secondary queue. So we should enqueue it, but not to
dispatch.
4) Let UPDATE_VERSION_STAMP not to enqueue if not primary queue, because
the event did not fire in pair.
5) AbstractGatewaySenderEventProcessor put loop of filter in wrong place,
which caused UPDATE_VERSION_STAMP and CME events are not ignored.
However, not to fix it for now. Leave it in GEODE-4659.
6) shouldSendVersionEvents for Remote sender should return true, since
we no longer support 7.0.1 any more.
7) change version to 150
8) CME event should not retry in AUO.doPutOrCreate, because retry will end
up with CME too.
9) CME && !originRemote: only enqueue to primary
This closes #1317
---
.../internal/cache/AbstractUpdateOperation.java | 8 ++++-
.../apache/geode/internal/cache/LocalRegion.java | 6 ++--
.../internal/cache/wan/GatewaySenderEventImpl.java | 34 ++++++++++++++++++----
.../serial/SerialGatewaySenderEventProcessor.java | 16 ++++++++--
.../cache30/DistributedAckRegionCCEDUnitTest.java | 4 ++-
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +++--
6 files changed, 61 insertions(+), 15 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 585e131..1eb2761 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -142,6 +142,12 @@ public abstract class AbstractUpdateOperation extends
DistributedCacheOperation
}
doUpdate = false;
}
+ if (ev.isConcurrencyConflict()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("basicUpdate failed with CME, not to retry:" +
ev);
+ }
+ doUpdate = false;
+ }
}
} finally {
if (isBucket) {
@@ -175,7 +181,7 @@ public abstract class AbstractUpdateOperation extends
DistributedCacheOperation
|| (rgn.getDataPolicy().withReplication() &&
rgn.getConcurrencyChecksEnabled())) {
overwriteDestroyed = true;
ev.makeCreate();
- rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */,
lastMod,
+ rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */,
lastMod,
overwriteDestroyed);
rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
updated = true;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index e6202ab..c33a13a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5631,6 +5631,8 @@ public class LocalRegion extends AbstractRegion
implements LoaderHelperFactory,
logger.debug("caught concurrent modification attempt when applying
{}", event);
}
notifyBridgeClients(event);
+ notifyGatewaySender(event.getOperation().isUpdate() ?
EnumListenerEvent.AFTER_UPDATE
+ : EnumListenerEvent.AFTER_CREATE, event);
return false;
}
@@ -6114,8 +6116,7 @@ public class LocalRegion extends AbstractRegion
implements LoaderHelperFactory,
}
protected void notifyGatewaySender(EnumListenerEvent operation,
EntryEventImpl event) {
- if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
- // isConcurrencyConflict is usually a concurrent cache modification
problem
+ if (isPdxTypesRegion()) {
return;
}
@@ -6505,6 +6506,7 @@ public class LocalRegion extends AbstractRegion
implements LoaderHelperFactory,
// Notify clients only if its NOT a gateway event.
if (event.getVersionTag() != null &&
!event.getVersionTag().isGatewayTag()) {
notifyBridgeClients(event);
+ notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
}
return true; // event was elided
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 2748c7d..d314664 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.Conflatable;
@@ -61,8 +62,8 @@ import org.apache.geode.internal.size.Sizeable;
* @since GemFire 7.0
*
*/
-public class GatewaySenderEventImpl
- implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable,
Releasable {
+public class GatewaySenderEventImpl implements AsyncEvent,
DataSerializableFixedID, Conflatable,
+ Sizeable, Releasable, VersionedDataSerializable {
private static final long serialVersionUID = -5690172020872255422L;
protected static final Object TOKEN_NULL = new Object();
@@ -171,6 +172,8 @@ public class GatewaySenderEventImpl
protected boolean isInitialized;
+ private transient boolean isConcurrencyConflict = false;
+
/**
* Is this thread in the process of serializing this event?
*/
@@ -312,6 +315,7 @@ public class GatewaySenderEventImpl
if (initialize) {
initialize();
}
+ this.isConcurrencyConflict = event.isConcurrencyConflict();
}
/**
@@ -673,7 +677,13 @@ public class GatewaySenderEventImpl
return GATEWAY_SENDER_EVENT_IMPL;
}
+ @Override
public void toData(DataOutput out) throws IOException {
+ toDataPre_GEODE_1_5_0_0(out);
+ DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
+ }
+
+ public void toDataPre_GEODE_1_5_0_0(DataOutput out) throws IOException {
// Make sure we are initialized before we serialize.
initialize();
out.writeShort(VERSION);
@@ -697,7 +707,13 @@ public class GatewaySenderEventImpl
DataSerializer.writeObject(this.key, out);
}
+ @Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
+ fromDataPre_GEODE_1_5_0_0(in);
+ this.isConcurrencyConflict = DataSerializer.readBoolean(in);
+ }
+
+ public void fromDataPre_GEODE_1_5_0_0(DataInput in) throws IOException,
ClassNotFoundException {
short version = in.readShort();
if (version != VERSION) {
// warning?`
@@ -744,7 +760,8 @@ public class GatewaySenderEventImpl
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
- .append(";bucketId=").append(this.bucketId).append("]");
+
.append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+ .append(this.isConcurrencyConflict).append("]");
return buffer.toString();
}
@@ -1128,6 +1145,14 @@ public class GatewaySenderEventImpl
return bucketId;
}
+ public boolean isConcurrencyConflict() {
+ return isConcurrencyConflict;
+ }
+
+ public boolean setConcurrencyConflict(boolean isConcurrencyConflict) {
+ return this.isConcurrencyConflict = isConcurrencyConflict;
+ }
+
/**
* @param tailKey the tailKey to set
*/
@@ -1144,8 +1169,7 @@ public class GatewaySenderEventImpl
@Override
public Version[] getSerializationVersions() {
- // TODO Auto-generated method stub
- return null;
+ return new Version[] {Version.GEODE_150};
}
public int getSerializedValueSize() {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index f9eb9c0..3fa4d6a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -34,6 +34,7 @@ import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayQueueEvent;
@@ -41,6 +42,7 @@ import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
@@ -377,6 +379,9 @@ public class SerialGatewaySenderEventProcessor extends
AbstractGatewaySenderEven
if (m != null) {
for (EventWrapper ew : m.values()) {
GatewaySenderEventImpl gatewayEvent = ew.event;
+ if (logger.isDebugEnabled()) {
+ logger.debug("releaseUnprocessedEvents:" + gatewayEvent);
+ }
gatewayEvent.release();
}
this.unprocessedEvents = null;
@@ -421,9 +426,14 @@ public class SerialGatewaySenderEventProcessor extends
AbstractGatewaySenderEven
} else {
// If it is not, create an uninitialized GatewayEventImpl and
// put it into the map of unprocessed events.
- senderEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, false); // OFFHEAP
-
// ok
- handleSecondaryEvent(senderEvent);
+ // 2 Special cases:
+ // 1) UPDATE_VERSION_STAMP: only enqueue to primary
+ // 2) CME && !originRemote: only enqueue to primary
+ if (!(event.getOperation().equals(Operation.UPDATE_VERSION_STAMP)
+ || ((EntryEventImpl) event).isConcurrencyConflict() &&
!event.isOriginRemote())) {
+ senderEvent = new GatewaySenderEventImpl(operation, event,
substituteValue, false); // OFFHEAP
+ handleSecondaryEvent(senderEvent);
+ }
}
}
}
diff --git
a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 90e880c..ac96819 100644
---
a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -476,7 +476,9 @@ public class DistributedAckRegionCCEDUnitTest extends
DistributedAckRegionDUnitT
CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id,
true, holder);
vm0.invoke(new SerializableRunnable("check conflation count") {
public void run() {
- assertEquals("expected one conflated event", 1,
+ // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false
ifNew=false
+ // ARM.updateEntry will be called one more time, so there will be 2
conflacted events
+ assertEquals("expected two conflated event", 2,
CCRegion.getCachePerfStats().getConflatedEventsCount());
}
});
diff --git
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3623071..b50e95c 100644
---
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1988,9 +1988,11 @@
org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
fromData,63,2a2bb700182a2bb80019b60017b500032abb00075905b7000ab500062bb9001a01003d033e1d1ca200172ab400062bb9001a0100b6001b57840301a7ffeab1
toData,87,2a2bb700112ab40003b800122bb800132ab40006c6003b2b2ab40006b60014b9001502002ab40006b600164d2cb9000c010099001a2cb9000d0100c0000e4e2b2db60017b900150200a7ffe3a7000a2b03b900150200b1
-org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
-fromData,183,2bb9007201003d1c10119f00032a04b5002b2a2bb900730100b500282a2bb900730100b500291c1011a200232bc1007499001c2bb80075b20076a60012bb0077592bc00074b20078b700794c2a2bb8007ac0007bb5002a2a2bb8007cb500102a2bb9007d0100b5002e2a2bb6007e2a2bb8007fb500302a2bb8007ac00020b500212a2bb900800100b500132a2bb900810100b500172a2bb900730100b500092a2bb900810100b80004b500052a2bb900810100b5001bb1
-toData,133,2ab600272b1011b9006702002b2ab40028b9006802002b2ab40029b9006802002ab4002a2bb800692ab400102bb8006a2b2ab4002eb9006b02002a2bb6006c2ab6002f2bb8006d2ab400212bb800692b2ab40013b9006e02002b2ab40017b9006f03002b2ab40009b9006802002b2ab40005b60070b9006f03002b2ab60071b9006f0300b1
+org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
+fromData,17,2a2bb600772a2bb80078b60079b50006b1
+fromDataPre_GEODE_1_5_0_0,183,2bb9007a01003d1c10119f00032a04b5002d2a2bb9007b0100b5002a2a2bb9007b0100b5002b1c1011a200232bc1007c99001c2bb8007db2007ea60012bb007f592bc0007cb20080b700814c2a2bb80082c00083b5002c2a2bb80084b500112a2bb900850100b500302a2bb600862a2bb80087b500322a2bb80082c00021b500222a2bb900880100b500142a2bb900890100b500182a2bb9007b0100b5000a2a2bb900890100b80004b500052a2bb900890100b5001cb1
+toData,17,2a2bb600692ab40006b8006a2bb8006bb1
+toDataPre_GEODE_1_5_0_0,133,2ab600282b1011b9006c02002b2ab4002ab9006d02002b2ab4002bb9006d02002ab4002c2bb8006e2ab400112bb8006f2b2ab40030b9007002002a2bb600712ab600312bb800722ab400222bb8006e2b2ab40014b9007302002b2ab40018b9007403002b2ab4000ab9006d02002b2ab40005b60075b9007403002b2ab60076b900740300b1
org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1
--
To stop receiving notification emails like this one, please contact
[email protected].