Repository: incubator-geode Updated Branches: refs/heads/develop 7bc011293 -> 4708d4e18
GEODE-278 Enque events in one more spot. With the previous commit, I had missed the code block for Adjunct messages. Enque tx events in that block too. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4708d4e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4708d4e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4708d4e1 Branch: refs/heads/develop Commit: 4708d4e182f89c6a391fcad8bac854f929717685 Parents: 7bc0112 Author: Swapnil Bawaskar <sbawas...@pivotal.io> Authored: Thu Sep 17 15:43:52 2015 -0700 Committer: Swapnil Bawaskar <sbawas...@pivotal.io> Committed: Thu Sep 17 15:43:52 2015 -0700 ---------------------------------------------------------------------- .../gemfire/internal/cache/TXCommitMessage.java | 29 ++-- .../cache/ClientServerTransactionDUnitTest.java | 151 ++++++++++++++++--- 2 files changed, 141 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java index 94aaadc..2a597e9 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java @@ -737,7 +737,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member public void basicProcessOps() { { - List<EntryEventImpl> pendingCallbacks = new ArrayList<>(); + List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size()); Collections.sort(this.farSideEntryOps); Iterator it = this.farSideEntryOps.iterator(); while (it.hasNext()) { @@ -758,14 +758,18 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member Iterator<EntryEventImpl> ci = callbacks.iterator(); while(ci.hasNext()) { EntryEventImpl ee = ci.next(); - if(ee.getOperation().isDestroy()) { - ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true); - } else if(ee.getOperation().isInvalidate()) { - ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true); - } else if(ee.getOperation().isCreate()) { - ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true); - } else { - ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true); + try { + if (ee.getOperation().isDestroy()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true); + } else if (ee.getOperation().isInvalidate()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true); + } else if (ee.getOperation().isCreate()) { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true); + } else { + ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true); + } + } finally { + ee.release(); } } } @@ -1294,7 +1298,6 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member * This happens when we don't have the bucket and are getting adjunct notification */ EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey); - try { if(entryOp.filterRoutingInfo!=null) { eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId())); } @@ -1309,10 +1312,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member // the message was sent and already reflects the change caused by this event. // In the latter case we need to invoke listeners final boolean skipListeners = !isDuplicate; - eei.invokeCallbacks(this.r, skipListeners, true); - } finally { - eei.release(); - } + eei.setInvokePRCallbacks(!skipListeners); + pendingCallbacks.add(eei); return; } if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java index 51a8dea..4b65a95 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java @@ -21,34 +21,11 @@ import java.util.concurrent.TimeUnit; import javax.naming.Context; import javax.transaction.UserTransaction; +import com.gemstone.gemfire.LogWriter; +import com.gemstone.gemfire.cache.*; import org.junit.Ignore; -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.CacheListener; -import com.gemstone.gemfire.cache.CacheLoader; -import com.gemstone.gemfire.cache.CacheLoaderException; -import com.gemstone.gemfire.cache.CacheTransactionManager; -import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.CommitConflictException; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.EntryEvent; -import com.gemstone.gemfire.cache.LoaderHelper; -import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.Region.Entry; -import com.gemstone.gemfire.cache.RegionFactory; -import com.gemstone.gemfire.cache.RegionShortcut; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException; -import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; -import com.gemstone.gemfire.cache.TransactionEvent; -import com.gemstone.gemfire.cache.TransactionException; -import com.gemstone.gemfire.cache.TransactionId; -import com.gemstone.gemfire.cache.TransactionInDoubtException; -import com.gemstone.gemfire.cache.TransactionWriter; -import com.gemstone.gemfire.cache.TransactionWriterException; -import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; import com.gemstone.gemfire.cache.client.ClientCache; import com.gemstone.gemfire.cache.client.ClientCacheFactory; import com.gemstone.gemfire.cache.client.ClientRegionFactory; @@ -3250,4 +3227,128 @@ public void testClientCommitAndDataStoreGetsEvent() throws Exception { }); } + + public void testAdjunctMessage() { + Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + final String regionName = "testAdjunctMessage"; + + final int port1 = createRegionsAndStartServer(server1, false); + final int port2 = createRegionsAndStartServer(server2, false); + + SerializableCallable createServerRegionWithInterest = new SerializableCallable() { + @Override + public Object call() throws Exception { + RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION); + rf.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT)); + rf.create(regionName); + return null; + } + }; + server1.invoke(createServerRegionWithInterest); + server2.invoke(createServerRegionWithInterest); + + // get two colocated keys on server1 + final List<String> keys = (List<String>) server1.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + Region r = getCache().getRegion(regionName); + PartitionedRegion pr = (PartitionedRegion) r; + List<String> server1Keys = new ArrayList<String>(); + for (int i=0; i<100; i++) { + String key = "k"+i; + //pr.put(key, "v" + i); + DistributedMember owner = pr.getOwnerForKey(pr.getKeyInfo(key)); + if (owner.equals(pr.getMyId())) { + server1Keys.add(key); + if (server1Keys.size() == 2) { + break; + } + } + } + return server1Keys; + } + }); + + class ClientListener extends CacheListenerAdapter { + Set keys = new HashSet(); + @Override + public void afterCreate(EntryEvent event) { + add(event); + } + @Override + public void afterUpdate(EntryEvent event) { + add(event); + } + private void add(EntryEvent event) { + keys.add(event.getKey()); + } + } + client2.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true"); + ClientCacheFactory ccf = new ClientCacheFactory(); + ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port2); + ccf.setPoolMinConnections(0); + ccf.setPoolSubscriptionEnabled(true); + ccf.setPoolSubscriptionRedundancy(0); + ccf.set("log-level", getDUnitLogLevel()); + ClientCache cCache = getClientCache(ccf); + Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).addCacheListener(new ClientListener()).create(regionName); + r.registerInterestRegex(".*"); + //cCache.readyForEvents(); + return null; + } + }); + client1.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true"); + ClientCacheFactory ccf = new ClientCacheFactory(); + ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1); + ccf.setPoolMinConnections(0); + ccf.setPoolSubscriptionEnabled(true); + ccf.set("log-level", getDUnitLogLevel()); + ClientCache cCache = getClientCache(ccf); + Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName); + getCache().getCacheTransactionManager().begin(); + for (String key : keys) { + r.put(key, "value"); + } + getCache().getCacheTransactionManager().commit(); + return null; + } + }); + client2.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + Region r = getCache().getRegion(regionName); + CacheListener[] listeners = r.getAttributes().getCacheListeners(); + boolean foundListener = false; + for (CacheListener listener : listeners) { + if (listener instanceof ClientListener) { + foundListener = true; + final ClientListener clientListener = (ClientListener) listener; + WaitCriterion wc = new WaitCriterion() { + @Override + public boolean done() { + return clientListener.keys.containsAll(keys); + } + @Override + public String description() { + return "expected:"+keys+" found:"+clientListener.keys; + } + }; + DistributedTestCase.waitForCriterion(wc, 30*1000, 500, true); + } + } + assertTrue(foundListener); + return null; + } + }); + } }