This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-5528 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 41c16e35ff3691709bc47b824e6c9f4f6caf05df Author: Bruce Schuchardt <[email protected]> AuthorDate: Fri Aug 3 15:47:00 2018 -0700 GEODE-5528: client event listener invoked multiple times for the same transactional operation TXCommitMessage.combine() was checking to see if a RegionCommit was already in its collection by using Collection.contains() but RegionCommit doesn't implement equals() so this check would return false even if the collection had a RegionCommit for the same Region. The fix is to check for the region's path instead. --- .../cache/ClientServerTransactionDUnitTest.java | 131 ++++++++++++++++++--- .../geode/internal/cache/TXCommitMessage.java | 23 ++-- 2 files changed, 130 insertions(+), 24 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java index c073f5e..8eccc54 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java @@ -38,6 +38,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.naming.Context; import javax.transaction.RollbackException; @@ -111,6 +112,7 @@ import org.apache.geode.internal.jta.TransactionImpl; import org.apache.geode.internal.jta.TransactionManagerImpl; import org.apache.geode.internal.jta.UserTransactionImpl; import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.SerializableCallable; @@ -2851,6 +2853,21 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest }); } + class CreateReplicateRegion extends SerializableCallable { + String regionName; + + public CreateReplicateRegion(String replicateRegionName) { + this.regionName = replicateRegionName; + } + + public Object call() throws Exception { + RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE); + rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); + rf.create(regionName); + return null; + } + } + /** * start 3 servers, accessor has r1 and r2; ds1 has r1, ds2 has r2 stop server after distributing * commit but b4 replying to client @@ -2870,21 +2887,6 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest } }); - class CreateReplicateRegion extends SerializableCallable { - String regionName; - - public CreateReplicateRegion(String replicateRegionName) { - this.regionName = replicateRegionName; - } - - public Object call() throws Exception { - RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE); - rf.setConcurrencyChecksEnabled(getConcurrencyChecksEnabled()); - rf.create(regionName); - return null; - } - } - accessor.invoke(new CreateReplicateRegion("r1")); accessor.invoke(new CreateReplicateRegion("r2")); @@ -2973,6 +2975,105 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest } /** + * start 3 servers with region r1. stop client's server after distributing + * commit but before replying to client. ensure that a listener in the + * client is only invoked once + */ + @Test + public void testFailoverAfterCommitDistributionInvokesListenerInClientOnlyOnce() { + Host host = Host.getHost(0); + VM server = host.getVM(0); + VM datastore1 = host.getVM(1); + VM datastore2 = host.getVM(2); + VM client = host.getVM(3); + + final int port1 = createRegionsAndStartServer(server, false); + final int port2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + + server.invoke(new CreateReplicateRegion("r1")); + + + client.invoke("create client cache", () -> { + disconnectFromDS(); + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", + "true"); + ClientCacheFactory ccf = new ClientCacheFactory(); + ccf.addPoolServer("localhost"/* getServerHostName(Host.getHost(0)) */, port1); + ccf.addPoolServer("localhost", port2); + ccf.setPoolMinConnections(5); + ccf.setPoolLoadConditioningInterval(-1); + ccf.setPoolSubscriptionEnabled(false); + ccf.set(LOG_LEVEL, getDUnitLogLevel()); + ClientCache cCache = getClientCache(ccf); + Region r1 = + cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r1"); + Region r2 = + cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("r2"); + return null; + }); + + datastore1.invoke("create backup server", () -> { + CacheServer s = getCache().addCacheServer(); + getCache().getLogger().info("SWAP:ds1"); + s.setPort(port2); + s.start(); + return null; + }); + datastore1.invoke(new CreateReplicateRegion("r1")); + datastore2.invoke(new CreateReplicateRegion("r1")); + + final TransactionId txId = client.invoke("start transaction in client", () -> { + ClientCache cCache = (ClientCache) getCache(); + Region r1 = cCache.getRegion("r1"); + r1.put("key", "value"); + cCache.getCacheTransactionManager().begin(); + cCache.getLogger().info("beganTX"); + r1.destroy("key"); + return cCache.getCacheTransactionManager().getTransactionId(); + }); + + server.invoke("close cache after sending tx message to other servers", () -> { + final TXManagerImpl mgr = (TXManagerImpl) getCache().getCacheTransactionManager(); + assertTrue(mgr.isHostedTxInProgress((TXId) txId)); + TXStateProxyImpl txProxy = (TXStateProxyImpl) mgr.getHostedTXState((TXId) txId); + final TXState txState = (TXState) txProxy.getRealDeal(null, null); + txState.setAfterSend(new Runnable() { + public void run() { + getCache().getLogger().info("server is now closing its cache"); + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close", "true"); + try { + mgr.removeHostedTXState((TXId) txState.getTransactionId()); + DistributedTestUtils.crashDistributedSystem(getCache().getDistributedSystem()); + } finally { + System.getProperties() + .remove(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); + } + } + }); + return null; + }); + + client.invoke("committing transaction in client", () -> { + Region r1 = getCache().getRegion("r1"); + final AtomicInteger afterDestroyInvocations = new AtomicInteger(); + CacheListener listener = new CacheListenerAdapter() { + @Override + public void afterDestroy(EntryEvent event) { + System.err.println("afterDestroy invoked"); + Thread.dumpStack(); + afterDestroyInvocations.incrementAndGet(); + } + }; + r1.getAttributesMutator().addCacheListener(listener); + + getCache().getCacheTransactionManager().commit(); + assertFalse(r1.containsKey("key")); + assertEquals(1, afterDestroyInvocations.intValue()); + return null; + }); + } + + /** * start a tx in a thread, obtain local locks and wait. start another tx and commit, make sure 2nd * thread gets CCE */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 13f15ea..9a3a1e4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -87,7 +87,7 @@ public class TXCommitMessage extends PooledDistributionMessage // Keep a 60 second history @ an estimated 1092 transactions/second ~= 16^4 protected static final TXFarSideCMTracker txTracker = new TXFarSideCMTracker((60 * 1092)); - private ArrayList regions; // list of RegionCommit instances + private ArrayList<RegionCommit> regions; // list of RegionCommit instances protected TXId txIdent; protected int processorId; // 0 unless needsAck is true protected TXLockIdImpl lockId; @@ -991,14 +991,15 @@ public class TXCommitMessage extends PooledDistributionMessage public void combine(TXCommitMessage other) { assert other != null; Iterator it = other.regions.iterator(); - while (it.hasNext()) { - RegionCommit rc = (RegionCommit) it.next(); - if (!this.regions.contains(rc)) { - if (logger.isDebugEnabled()) { - logger.debug("TX: adding region commit: {} to: {}", rc, this); - } - rc.msg = this; - this.regions.add(rc); + Map<String, RegionCommit> regionCommits = new HashMap<>(); + for (RegionCommit commit : regions) { + regionCommits.put(commit.getRegionPath(), commit); + } + for (RegionCommit commit : other.regions) { + if (!regionCommits.containsKey(commit.getRegionPath())) { + commit.msg = this; + this.regions.add(commit); + regionCommits.put(commit.getRegionPath(), commit); } } } @@ -1131,6 +1132,10 @@ public class TXCommitMessage extends PooledDistributionMessage this.msg = msg; } + public String getRegionPath() { + return regionPath; + } + public void incRefCount() { this.refCount++; }
