Repository: incubator-geode Updated Branches: refs/heads/develop 49d99d4e5 -> 0c13b4d6f
Fix for GEODE-278 While applying changes to the Region, pass in List for gathering pendingCallbacks rather than a null on the remote members to get the same behavior as transaction host. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c13b4d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c13b4d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c13b4d6 Branch: refs/heads/develop Commit: 0c13b4d6ffbc73a2eb29202c24b27cf2b1f71611 Parents: 49d99d4 Author: Swapnil Bawaskar <sbawas...@pivotal.io> Authored: Tue Aug 25 10:23:21 2015 -0700 Committer: Swapnil Bawaskar <sbawas...@pivotal.io> Committed: Mon Aug 31 14:28:01 2015 -0700 ---------------------------------------------------------------------- .../gemfire/internal/cache/TXCommitMessage.java | 34 +++++++--- .../cache/RemoteTransactionDUnitTest.java | 68 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java index f012bab..94aaadc 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java @@ -737,21 +737,39 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member public void basicProcessOps() { { + List<EntryEventImpl> pendingCallbacks = new ArrayList<>(); Collections.sort(this.farSideEntryOps); Iterator it = this.farSideEntryOps.iterator(); while (it.hasNext()) { try { RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp)it.next(); - entryOp.process(); + entryOp.process(pendingCallbacks); } catch (CacheRuntimeException problem) { processCacheRuntimeException(problem); } catch (Exception e ) { addProcessingException(e); } } + firePendingCallbacks(pendingCallbacks); } } - + + private void firePendingCallbacks(List<EntryEventImpl> callbacks) { + Iterator<EntryEventImpl> ci = callbacks.iterator(); + while(ci.hasNext()) { + EntryEventImpl ee = ci.next(); + if(ee.getOperation().isDestroy()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true); + } else if(ee.getOperation().isInvalidate()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true); + } else if(ee.getOperation().isCreate()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true); + } else { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true); + } + } + } + protected void processCacheRuntimeException(CacheRuntimeException problem) { if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException addProcessingException(problem); @@ -1262,7 +1280,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member * Apply a single tx entry op on the far side */ @SuppressWarnings("synthetic-access") - protected void txApplyEntryOp(FarSideEntryOp entryOp) + protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks) { if (this.r == null) { return; @@ -1312,7 +1330,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member entryOp.op, getEventId(entryOp), entryOp.callbackArg, - null /* fire inline, no pending callbacks */, + pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */, @@ -1328,7 +1346,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member false /*localOp*/, getEventId(entryOp), entryOp.callbackArg, - null /* fire inline, no pending callbacks */, + pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext, null/*txEntryState*/, @@ -1343,7 +1361,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member this.txEvent, getEventId(entryOp), entryOp.callbackArg, - null /* fire inline, no pending callbacks */, + pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext, null/*txEntryState*/, @@ -1658,8 +1676,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member /** * Performs this entryOp on the farside of a tx commit. */ - public void process() { - txApplyEntryOp(this); + public void process(List<EntryEventImpl> pendingCallbacks) { + txApplyEntryOp(this, pendingCallbacks); } public void processAdjunctOnly() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java index 5834622..ccff0c6 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java @@ -4244,4 +4244,72 @@ protected static class ClientListener extends CacheListenerAdapter { Object value = entry._getValue(); return value; } + + /** + * Install Listeners and verify that they are invoked after all tx events have been applied to the cache + * see GEODE-278 + */ + public void testNonInlineRemoteEvents() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + final String key1 = "nonInline-1"; + final String key2 = "nonInline-2"; + + class NonInlineListener extends CacheListenerAdapter { + boolean assertException = false; + + @Override + public void afterCreate(EntryEvent event) { + if (event.getKey().equals(key1)) { + if (getCache().getRegion(D_REFERENCE).get(key2) == null) { + assertException = true; + } + } + } + } + + SerializableCallable createRegionWithListener = new SerializableCallable() { + @Override + public Object call() throws Exception { + createRegion(false, 0, null); + getCache().getRegion(D_REFERENCE).getAttributesMutator().addCacheListener(new NonInlineListener()); + return null; + } + }; + + vm0.invoke(createRegionWithListener); + vm1.invoke(createRegionWithListener); + + vm0.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + Region region = getCache().getRegion(D_REFERENCE); + CacheTransactionManager mgr = getCache().getCacheTransactionManager(); + mgr.begin(); + region.put(key1, "nonInlineValue-1"); + region.put(key2, "nonInlineValue-2"); + mgr.commit(); + return null; + } + }); + + SerializableCallable verifyAssert = new SerializableCallable() { + @Override + public Object call() throws Exception { + CacheListener[] listeners = getCache().getRegion(D_REFERENCE).getAttributes().getCacheListeners(); + for (CacheListener listener : listeners) { + if (listener instanceof NonInlineListener) { + NonInlineListener l = (NonInlineListener) listener; + assertFalse(l.assertException); + } + } + return null; + } + }; + + vm0.invoke(verifyAssert); + vm1.invoke(verifyAssert); + + } }