ignite-1183 Fixed data structures create/destroy from client node
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3036c8d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3036c8d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3036c8d8 Branch: refs/heads/ignite-1093-2 Commit: 3036c8d85db223a26f48f273ad08b6ea87e2e2ba Parents: f025714 Author: sboikov <[email protected]> Authored: Thu Oct 8 16:33:53 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 8 16:33:53 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +- .../colocated/GridDhtColocatedLockFuture.java | 11 +- .../distributed/near/GridNearLockFuture.java | 11 +- .../near/GridNearOptimisticTxPrepareFuture.java | 24 +-- .../datastructures/DataStructuresProcessor.java | 48 ++++-- .../CacheGetFutureHangsSelfTest.java | 3 + ...niteCacheClientNodeChangingTopologyTest.java | 6 +- ...gniteAtomicLongChangingTopologySelfTest.java | 155 +++++++++++++++++-- 8 files changed, 216 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 41df53a..97aa646 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -585,8 +585,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (req != null) { res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion()); - res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " + - "response is received: " + nodeId)); + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + nodeId); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 33a5cbd..be09f54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -598,7 +598,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys, false); + map(keys, false, true); markInitialized(); @@ -654,7 +654,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture this.topVer.compareAndSet(null, topVer); } - map(keys, remap); + map(keys, remap, false); if (c != null) c.run(); @@ -691,8 +691,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * * @param keys Keys. * @param remap Remap flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void map(Collection<KeyCacheObject> keys, boolean remap) { + private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) { try { AffinityTopologyVersion topVer = this.topVer.get(); @@ -819,7 +820,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture boolean clientFirst = false; if (first) { - clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); first = false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index dcc8da6..e6b1e02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -718,7 +718,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean // Continue mapping on the same topology version as it was before. this.topVer.compareAndSet(null, topVer); - map(keys, false); + map(keys, false, true); markInitialized(); @@ -773,7 +773,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean this.topVer.compareAndSet(null, topVer); } - map(keys, remap); + map(keys, remap, false); markInitialized(); } @@ -807,8 +807,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * * @param keys Keys. * @param remap Remap flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void map(Iterable<KeyCacheObject> keys, boolean remap) { + private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) { try { AffinityTopologyVersion topVer = this.topVer.get(); @@ -938,7 +939,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean boolean clientFirst = false; if (first) { - clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks()); + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); first = false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 25028c4..1fb33a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -271,7 +271,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd cctx.mvcc().addFuture(this); - prepare0(false); + prepare0(false, true); return; } @@ -338,7 +338,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return; } - prepare0(remap); + prepare0(remap, false); if (c != null) c.run(); @@ -428,8 +428,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * Initializes future. * * @param remap Remap flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ - private void prepare0(boolean remap) { + private void prepare0(boolean remap, boolean topLocked) { try { boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); @@ -451,7 +452,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd prepare( tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries()); + tx.writeEntries(), + topLocked); markInitialized(); } @@ -466,11 +468,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * @param reads Read entries. * @param writes Write entries. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @throws IgniteCheckedException If failed. */ private void prepare( Iterable<IgniteTxEntry> reads, - Iterable<IgniteTxEntry> writes + Iterable<IgniteTxEntry> writes, + boolean topLocked ) throws IgniteCheckedException { AffinityTopologyVersion topVer = tx.topologyVersion(); @@ -497,7 +501,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd GridDistributedTxMapping cur = null; for (IgniteTxEntry read : reads) { - GridDistributedTxMapping updated = map(read, topVer, cur, false); + GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked); if (cur != updated) { mappings.offer(updated); @@ -514,7 +518,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } for (IgniteTxEntry write : writes) { - GridDistributedTxMapping updated = map(write, topVer, cur, true); + GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked); if (cur != updated) { mappings.offer(updated); @@ -647,13 +651,15 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param topVer Topology version. * @param cur Current mapping. * @param waitLock Wait lock flag. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @return Mapping. */ private GridDistributedTxMapping map( IgniteTxEntry entry, AffinityTopologyVersion topVer, @Nullable GridDistributedTxMapping cur, - boolean waitLock + boolean waitLock, + boolean topLocked ) { GridCacheContext cacheCtx = entry.context(); @@ -685,7 +691,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { - boolean clientFirst = cur == null && cctx.kernalContext().clientNode(); + boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode(); cur = new GridDistributedTxMapping(primary); http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index ef2c543..7c5e97c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -505,14 +505,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return dataStructure; } - catch (ClusterTopologyCheckedException e) { - IgniteInternalFuture<?> fut = e.retryReadyFuture(); - - fut.get(); - } catch (IgniteTxRollbackCheckedException ignore) { // Safe to retry right away. } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr == null) + throw e; + + IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + + if (fut != null) + fut.get(); + } } } @@ -593,14 +599,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (afterRmv != null && rmvInfo != null) afterRmv.applyx(rmvInfo); } - catch (ClusterTopologyCheckedException e) { - IgniteInternalFuture<?> fut = e.retryReadyFuture(); - - fut.get(); - } catch (IgniteTxRollbackCheckedException ignore) { // Safe to retry right away. } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr == null) + throw e; + + IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + + if (fut != null) + fut.get(); + } } } @@ -995,14 +1007,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return col; } - catch (ClusterTopologyCheckedException e) { - IgniteInternalFuture<?> fut = e.retryReadyFuture(); - - fut.get(); - } catch (IgniteTxRollbackCheckedException ignore) { // Safe to retry right away. } + catch (IgniteCheckedException e) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr == null) + throw e; + + IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + + if (fut != null) + fut.get(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java index e8622aa..51e76f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -54,6 +55,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + OptimizedMarshaller marsh = new OptimizedMarshaller(); marsh.setRequireSerializable(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 2d29c49..1b3dc7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -118,7 +118,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac cfg.setClientMode(client); - cfg.setCommunicationSpi(new TestCommunicationSpi()); + TestCommunicationSpi commSpi = new TestCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); cfg.setCacheConfiguration(ccfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/3036c8d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java index 337334e..32a86e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -17,21 +17,37 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicLong; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.configuration.AtomicConfiguration; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; /** * @@ -52,6 +68,9 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** */ + private boolean client; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -60,16 +79,18 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract discoSpi.setIpFinder(IP_FINDER); - cfg.setDiscoverySpi(discoSpi); + cfg.setDiscoverySpi(discoSpi).setNetworkTimeout(30_000); AtomicConfiguration atomicCfg = new AtomicConfiguration(); - atomicCfg.setCacheMode(CacheMode.PARTITIONED); + atomicCfg.setCacheMode(PARTITIONED); atomicCfg.setBackups(1); cfg.setAtomicConfiguration(atomicCfg); ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + cfg.setClientMode(client); + return cfg; } @@ -111,6 +132,110 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract /** * @throws Exception If failed. */ + public void testClientAtomicLongCreateCloseFailover() throws Exception { + testFailoverWithClient(new IgniteInClosure<Ignite>() { + @Override public void apply(Ignite ignite) { + for (int i = 0; i < 100; i++) { + IgniteAtomicLong l = ignite.atomicLong("long-" + 1, 0, true); + + l.close(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testClientQueueCreateCloseFailover() throws Exception { + testFailoverWithClient(new IgniteInClosure<Ignite>() { + @Override public void apply(Ignite ignite) { + for (int i = 0; i < 100; i++) { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setBackups(1); + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC); + + IgniteQueue q = ignite.queue("q-" + i, 0, colCfg); + + q.close(); + } + } + }); + } + + /** + * @param c Test iteration closure. + * @throws Exception If failed. + */ + private void testFailoverWithClient(IgniteInClosure<Ignite> c) throws Exception { + startGridsMultiThreaded(GRID_CNT, false); + + client = true; + + Ignite ignite = startGrid(GRID_CNT); + + assertTrue(ignite.configuration().isClientMode()); + + client = false; + + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = restartThread(finished); + + long stop = System.currentTimeMillis() + 30_000; + + try { + int iter = 0; + + while (System.currentTimeMillis() < stop) { + log.info("Iteration: " + iter++); + + c.apply(ignite); + } + + finished.set(true); + + fut.get(); + } + finally { + finished.set(true); + } + } + + /** + * @param finished Finished flag. + * @return Future. + */ + private IgniteInternalFuture<?> restartThread(final AtomicBoolean finished) { + return GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + for (int i = 0; i < GRID_CNT; i++) { + log.info("Stop node: " + i); + + stopGrid(i); + + U.sleep(500); + + log.info("Start node: " + i); + + startGrid(i); + + if (finished.get()) + break; + } + } + + return null; + } + }, "restart-thread"); + } + + /** + * @throws Exception If failed. + */ public void testIncrementConsistency() throws Exception { startGrids(GRID_CNT);
