Repository: ignite Updated Branches: refs/heads/ignite-3478 d26266456 -> 880ea9821
ignite-6149 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/880ea982 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/880ea982 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/880ea982 Branch: refs/heads/ignite-3478 Commit: 880ea98217a4c9fa6058ca954c216e0f58f85f61 Parents: d262664 Author: sboikov <[email protected]> Authored: Wed Sep 20 18:21:54 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 20 18:21:54 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 2 +- .../internal/TestRecordingCommunicationSpi.java | 14 +++ .../cache/mvcc/CacheMvccTransactionsTest.java | 110 ++++++++++++++++++- 3 files changed, 122 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index ea74f3c..d7be3eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1400,7 +1400,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager activeTx = true; } - // Should not delete oldest version which is less than cleanup version . + // Should not delete oldest version which is less than cleanup version. int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); if (cmp <= 0) { http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index ab61687..859010e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -60,6 +61,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** */ private IgniteBiPredicate<ClusterNode, Message> blockP; + /** */ + private volatile IgniteBiInClosure<ClusterNode, Message> c; + /** * @param node Node. * @return Test SPI. @@ -76,6 +80,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { Message msg0 = ioMsg.message(); + if (c != null) + c.apply(node, msg0); + synchronized (this) { boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass())) || (recordP != null && recordP.apply(node, msg0)); @@ -212,6 +219,13 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param c Message closure. + */ + public void closure(IgniteBiInClosure<ClusterNode, Message> c) { + this.c = c; + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) { http://git-wip-us.apache.org/repos/asf/ignite/blob/880ea982/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index cf3bafb..0265519 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -597,12 +599,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testCleanupWaitsForGet() throws Exception { + public void testCleanupWaitsForGet1() throws Exception { boolean vals[] = {true, false}; for (boolean otherPuts : vals) { for (boolean putOnStart : vals) { - cleanupWaitsForGet(otherPuts, putOnStart); + cleanupWaitsForGet1(otherPuts, putOnStart); afterTest(); } @@ -614,7 +616,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @param putOnStart {@code True} to put data in cache before getAll. * @throws Exception If failed. */ - private void cleanupWaitsForGet(boolean otherPuts, final boolean putOnStart) throws Exception { + private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart) throws Exception { info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart + "]"); testSpi = true; @@ -700,6 +702,108 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { assertEquals(5, (Object)vals.get(key2)); } + + + /** + * @throws Exception If failed. + */ + public void testCleanupWaitsForGet2() throws Exception { + testSpi = true; + + client = false; + + startGrids(2); + + client = true; + + final Ignite client = startGrid(2); + + awaitPartitionMapExchange(); + + final IgniteCache<Object, Object> cache = client.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 16). + setNodeFilter(new TestCacheNodeExcludingFilter(ignite(0).name()))); + + final Integer key1 = 1; + final Integer key2 = 2; + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 0); + cache.put(key2, 0); + + tx.commit(); + } + + TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(grid(0)); + + TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(client); + + final CountDownLatch getLatch = new CountDownLatch(1); + + clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() { + @Override public void apply(ClusterNode node, Message msg) { + if (msg instanceof CoordinatorTxAckRequest) + doSleep(2000); + } + }); + + crdSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() { + /** */ + private AtomicInteger cntr = new AtomicInteger(); + + @Override public void apply(ClusterNode node, Message msg) { + if (msg instanceof MvccCoordinatorVersionResponse) { + if (cntr.incrementAndGet() == 2) { + getLatch.countDown(); + + doSleep(1000); + } + } + } + }); + + final IgniteInternalFuture<?> putFut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + + tx.commit(); + } + + return null; + } + }, "put1"); + + final IgniteInternalFuture<?> putFut2 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 2); + + tx.commit(); + } + + return null; + } + }, "put2"); + + IgniteInternalFuture<?> getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + U.await(getLatch); + + while (!putFut1.isDone() || !putFut2.isDone()) { + Map<Object, Object> vals = cache.getAll(F.asSet(key1, key2)); + + assertEquals(2, vals.size()); + } + + return null; + } + }, 4, "get-thread"); + + putFut1.get(); + putFut2.get(); + getFut.get(); + } + /** * @throws Exception If failed. */
