http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2271a85..571cc3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; @@ -57,7 +56,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; @@ -66,11 +64,11 @@ import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; @@ -95,10 +93,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DU import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getLong; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -109,8 +103,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS * Future for exchanging partition maps. */ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) -public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware { +public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapter + implements Comparable<GridDhtPartitionsExchangeFuture>, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware { /** Dummy flag. */ private final boolean dummy; @@ -194,9 +188,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private CacheAffinityChangeMessage affChangeMsg; - /** Cache groups validation results. */ - private volatile Map<Integer, CacheValidation> grpValidRes; - /** Skip preload flag. */ private boolean skipPreload; @@ -648,15 +639,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd) { boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); - if (updateTop && clientTop != null) - top.update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); + if (updateTop && clientTop != null) { + top.update(topologyVersion(), + clientTop.partitionMap(true), + clientTop.updateCounters(false)); + } } - top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId())); + top.updateTopologyVersion( + this, + discoCache(), + updSeq, + cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId())); + top.updateTopologyVersion(this, discoCache(), -1, cacheGroupStopping(top.groupId())); } /** @@ -676,12 +674,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE); } - boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + assert !exchActions.clientOnlyExchange() : exchActions; - if (clientOnly) - return exchActions.clientCacheStarted(cctx.localNodeId()) ? ExchangeType.CLIENT : ExchangeType.NONE; - else - return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; + cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + + return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; } /** @@ -761,7 +758,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert fullMap != null; - grp.topology().update(exchId, fullMap, top.updateCounters(false)); + grp.topology().update(topologyVersion(), + fullMap, + top.updateCounters(false)); break; } @@ -1191,24 +1190,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT discoEvt.type() == EVT_NODE_JOINED) detectLostPartitions(); - Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size()); - - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - Collection<Integer> lostParts = grp.isLocal() ? - Collections.<Integer>emptyList() : grp.topology().lostPartitions(); - - boolean valid = true; + Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size()); - if (grp.topologyValidator() != null && !grp.systemCache()) - valid = grp.topologyValidator().validate(discoEvt.topologyNodes()); - - m.put(grp.groupId(), new CacheValidation(valid, lostParts)); - } + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + m.put(grp.groupId(), validateCacheGroup(grp, discoEvt.topologyNodes())); grpValidRes = m; } - cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err, false); + cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err); cctx.exchange().onExchangeDone(this, err); @@ -1243,113 +1233,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return dummy; } - /** {@inheritDoc} */ - @Nullable @Override public Throwable validateCache( - GridCacheContext cctx, - boolean recovery, - boolean read, - @Nullable Object key, - @Nullable Collection<?> keys - ) { - assert isDone() : this; - - Throwable err = error(); - - if (err != null) - return err; - - if (!cctx.shared().kernalContext().state().active()) - return new CacheInvalidStateException( - "Failed to perform cache operation (cluster is not activated): " + cctx.name()); - - CacheGroupContext grp = cctx.group(); - - PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); - - if (grp.needsRecovery() && !recovery) { - if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL)) - return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " + - cctx.name()); - } - - if (grp.needsRecovery() || grp.topologyValidator() != null) { - CacheValidation validation = grpValidRes.get(grp.groupId()); - - if (validation == null) - return null; - - if (!validation.valid && !read) - return new IgniteCheckedException("Failed to perform cache operation " + - "(cache topology is not valid): " + cctx.name()); - - if (recovery || !grp.needsRecovery()) - return null; - - if (key != null) { - int p = cctx.affinity().partition(key); - - CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p, - validation.lostParts, partLossPlc); - - if (ex != null) - return ex; - } - - if (keys != null) { - for (Object k : keys) { - int p = cctx.affinity().partition(k); - - CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p, - validation.lostParts, partLossPlc); - - if (ex != null) - return ex; - } - } - } - - return null; - } - - /** - * @param cacheName Cache name. - * @param read Read flag. - * @param key Key to check. - * @param part Partition this key belongs to. - * @param lostParts Collection of lost partitions. - * @param plc Partition loss policy. - * @return Invalid state exception if this operation is disallowed. - */ - private CacheInvalidStateException validatePartitionOperation( - String cacheName, - boolean read, - Object key, - int part, - Collection<Integer> lostParts, - PartitionLossPolicy plc - ) { - if (lostParts.contains(part)) { - if (!read) { - assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE; - - if (plc == READ_WRITE_SAFE) { - return new CacheInvalidStateException("Failed to execute cache operation " + - "(all partition owners have left the grid, partition data has been lost) [" + - "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); - } - } - else { - // Read. - if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) - return new CacheInvalidStateException("Failed to execute cache operation " + - "(all partition owners have left the grid, partition data has been lost) [" + - "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); - } - } - - return null; - } - /** * Cleans up resources to avoid excessive memory usage. */ @@ -1844,13 +1727,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - if (grp != null) - grp.topology().update(exchId, entry.getValue(), cntrMap); + if (grp != null) { + grp.topology().update(topologyVersion(), + entry.getValue(), + cntrMap); + } else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); - if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(grpId, this).update(exchId, entry.getValue(), cntrMap); + if (oldest != null && oldest.isLocal()) { + cctx.exchange().clientTopology(grpId, this).update(topologyVersion(), + entry.getValue(), + cntrMap); + } } } } @@ -2135,26 +2024,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT NONE } - /** - * Cache validation result. - */ - private static class CacheValidation { - /** Topology validation result. */ - private boolean valid; - - /** Lost partitions on this topology version. */ - private Collection<Integer> lostParts; - - /** - * @param valid Valid flag. - * @param lostParts Lost partitions. - */ - private CacheValidation(boolean valid, Collection<Integer> lostParts) { - this.valid = valid; - this.lostParts = lostParts; - } - } - /** {@inheritDoc} */ @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) { if (!isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 527688e..71c6b65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -327,6 +327,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { } /** + * @param ctx Cache context for this tx entry. + */ + public void context(GridCacheContext<?, ?> ctx) { + this.ctx = ctx; + } + + /** * @return Flag indicating if this entry is affinity mapped to the same node. */ public boolean locallyMapped() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 35ee011..b958a27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -949,6 +949,17 @@ public class IgniteTxHandler { if (nearTx != null) res.nearEvicted(nearTx.evicted()); + List<IgniteTxKey> writesCacheMissed = req.nearWritesCacheMissed(); + + if (writesCacheMissed != null) { + Collection<IgniteTxKey> evicted0 = res.nearEvicted(); + + if (evicted0 != null) + writesCacheMissed.addAll(evicted0); + + res.nearEvicted(writesCacheMissed); + } + if (dhtTx != null) req.txState(dhtTx.txState()); else if (nearTx != null) @@ -1595,7 +1606,7 @@ public class IgniteTxHandler { * @return Remote transaction. * @throws IgniteCheckedException If failed. */ - @Nullable public GridNearTxRemote startNearRemoteTx(ClassLoader ldr, UUID nodeId, + @Nullable private GridNearTxRemote startNearRemoteTx(ClassLoader ldr, UUID nodeId, GridDhtTxPrepareRequest req) throws IgniteCheckedException { if (!F.isEmpty(req.nearWrites())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index db0395f..0877305 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -276,6 +276,33 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); } + /** + * @param cacheId Cache ID. + */ + public void rollbackTransactionsForCache(int cacheId) { + rollbackTransactionsForCache(cacheId, nearIdMap); + + rollbackTransactionsForCache(cacheId, threadMap); + } + + /** + * @param cacheId Cache ID. + * @param txMap Transactions map. + */ + private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) { + for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) { + IgniteInternalTx tx = e.getValue(); + + for (IgniteTxEntry entry : tx.allEntries()) { + if (entry.cacheId() == cacheId) { + rollbackTx(tx); + + break; + } + } + } + } + /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java index eea1c92..222e58c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionForCachesSelfTest.java @@ -26,15 +26,16 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -131,10 +132,18 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testProjectionForDefaultCache() throws Exception { - ClusterGroup prj = ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME); + final ClusterGroup prj = ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME); assertNotNull(prj); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return prj.nodes().size() == 3; + } + }, 5000); + assertEquals(3, prj.nodes().size()); + assertTrue(prj.nodes().contains(grid(0).localNode())); assertFalse(prj.nodes().contains(grid(1).localNode())); assertTrue(prj.nodes().contains(grid(2).localNode())); @@ -146,9 +155,16 @@ public class GridProjectionForCachesSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testProjectionForNamedCache() throws Exception { - ClusterGroup prj = ignite.cluster().forCacheNodes(CACHE_NAME); + final ClusterGroup prj = ignite.cluster().forCacheNodes(CACHE_NAME); + + assertNotNull(prj); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return prj.nodes().size() == 3; + } + }, 5000); - assert prj != null; assertEquals("Invalid projection: " + prj.nodes(), 3, prj.nodes().size()); assert !prj.nodes().contains(grid(0).localNode()); assert prj.nodes().contains(grid(1).localNode()); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 7cafa93..2e1f2f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -1369,28 +1369,43 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac * @param cacheExists Cache exists flag. * @param clientCache {@code True} if client node has client cache. * @param clientNear {@code True} if client node has near-enabled client cache. + * @throws Exception If failed. */ private void checkCacheDiscoveryData(Ignite srv, Ignite client, - String cacheName, + final String cacheName, boolean cacheExists, - boolean clientCache, - boolean clientNear) { - GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); + final boolean clientCache, + boolean clientNear) throws Exception { + final GridDiscoveryManager srvDisco = ((IgniteKernal)srv).context().discovery(); GridDiscoveryManager clientDisco = ((IgniteKernal)client).context().discovery(); ClusterNode srvNode = ((IgniteKernal)srv).localNode(); - ClusterNode clientNode = ((IgniteKernal)client).localNode(); + final ClusterNode clientNode = ((IgniteKernal)client).localNode(); assertFalse(srvDisco.cacheAffinityNode(clientNode, cacheName)); assertFalse(clientDisco.cacheAffinityNode(clientNode, cacheName)); assertEquals(cacheExists, srvDisco.cacheAffinityNode(srvNode, cacheName)); - if (clientNear) + if (clientNear) { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srvDisco.cacheNearNode(clientNode, cacheName); + } + }, 5000)); + assertTrue(srvDisco.cacheNearNode(clientNode, cacheName)); - else + } + else { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return F.eq(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + } + }, 5000)); + assertEquals(clientCache, srvDisco.cacheClientNode(clientNode, cacheName)); + } assertEquals(cacheExists, clientDisco.cacheAffinityNode(srvNode, cacheName)); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java index 1a7e76a..f67e247 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; @@ -50,24 +51,25 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; /** * Checks stop and destroy methods behavior. */ +@SuppressWarnings("unchecked") public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** key-value used at test. */ - protected static String KEY_VAL = "1"; + /** Key-value used at test. */ + private static String KEY_VAL = "1"; - /** cache name 1. */ - protected static String CACHE_NAME_DHT = "cache"; + /** Cache name 1. */ + private static String CACHE_NAME_DHT = "cache"; - /** cache name 2. */ - protected static String CACHE_NAME_CLIENT = "cache_client"; + /** Cache name 2. */ + private static String CACHE_NAME_CLIENT = "cache_client"; - /** near cache name. */ - protected static String CACHE_NAME_NEAR = "cache_near"; + /** Near cache name. */ + private static String CACHE_NAME_NEAR = "cache_near"; - /** local cache name. */ - protected static String CACHE_NAME_LOC = "cache_local"; + /** Local cache name. */ + private static String CACHE_NAME_LOC = "cache_local"; /** Memory configuration to be used on client nodes with local caches. */ private static MemoryConfiguration memCfg; @@ -121,12 +123,12 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { public static AtomicInteger cnt = new AtomicInteger(); /** Node filter. */ - public static UUID nodeFilter; + static UUID nodeFilter; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); if (nodeFilter != null && node.id().equals(nodeFilter) && @@ -557,8 +559,6 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testNearClose() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-2189"); - startGridsMultiThreaded(gridCount()); IgniteCache<String, String> cache0 = grid(0).getOrCreateCache(getNearConfig()); @@ -596,9 +596,6 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { U.sleep(1000); - // Ensure near cache was NOT automatically updated. - assert CountingTxRequestsToClientNodeTcpCommunicationSpi.cnt.get() == 0; - assert cache0.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected. assert cache1.get(KEY_VAL).equals(KEY_VAL + 0);// Not affected. @@ -702,7 +699,10 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { AffinityTopologyVersion topVer = grid(1).context().cache().context().exchange().lastTopologyFuture().get(); - grid(0).context().cache().context().exchange().affinityReadyFuture(topVer).get(); + IgniteInternalFuture<?> fut = grid(0).context().cache().context().exchange().affinityReadyFuture(topVer); + + if (fut != null) + fut.get(); grid(0).getOrCreateCache(getLocalConfig()); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java index 707e275..9b3777d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collections; import java.util.Set; -import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; @@ -28,10 +27,12 @@ import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -163,13 +164,13 @@ public class GridCacheClearSelfTest extends GridCommonAbstractTest { * @param cacheMode Cache mode. * @param near Near cache flag. * @param keys Keys to clear. + * @throws Exception If failed. */ - private void testClear(CacheMode cacheMode, boolean near, @Nullable Set<Integer> keys) { + private void testClear(CacheMode cacheMode, boolean near, @Nullable Set<Integer> keys) throws Exception { Ignite client1 = client1(); Ignite client2 = client2(); - // TODO GG-11220 (use the same name when fixed). - String cacheName = "cache-" + UUID.randomUUID(); + final String cacheName = DEFAULT_CACHE_NAME; try { CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(cacheName); @@ -184,6 +185,12 @@ public class GridCacheClearSelfTest extends GridCommonAbstractTest { client2.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()) : client2.<Integer, Integer>cache(cacheName); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ignite(0).cluster().forCacheNodes(cacheName).nodes().size() == 5; + } + }, 5000); + for (int i = 0; i < 10; i++) cache1.put(i, i); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index 1d616f8..19b0ea5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -161,6 +162,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT } /** + * @param cacheName Cache name. * @throws Exception If failed. */ private void checkCacheInitialization(final String cacheName) throws Exception { @@ -170,8 +172,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT checkFineCache(client, CACHE_NAME + 1); - assertNull(client.cache(cacheName)); - assertNull(client.getOrCreateCache(cacheName)); + assertNull(((IgniteKernal)client).context().cache().cache(cacheName)); checkFineCache(client, CACHE_NAME + 2); } @@ -210,7 +211,6 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT else cache = client.cache(cacheName); - cache.put(1, "1"); assertEquals("1", cache.get(1)); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java new file mode 100644 index 0000000..f32e15f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheStartFailoverTest.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheServerNotFoundException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testClientStartCoordinatorFailsAtomic() throws Exception { + clientStartCoordinatorFails(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testClientStartCoordinatorFailsTx() throws Exception { + clientStartCoordinatorFails(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void clientStartCoordinatorFails(CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv0 = startGrids(3); + + final int KEYS = 500; + + IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1)); + + for (int i = 0; i < KEYS; i++) + cache.put(i, i); + + client = true; + + final Ignite c = startGrid(3); + + TestRecordingCommunicationSpi.spi(srv0).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + c.cache(DEFAULT_CACHE_NAME); + + return null; + } + }, "start-cache"); + + U.sleep(1000); + + assertFalse(fut.isDone()); + + stopGrid(0); + + fut.get(); + + cache = c.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < KEYS; i++) { + assertEquals(i, cache.get(i)); + + cache.put(i, i + 1); + + assertEquals(i + 1, cache.get(i)); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientStartLastServerFailsAtomic() throws Exception { + clientStartLastServerFails(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testClientStartLastServerFailsTx() throws Exception { + clientStartLastServerFails(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void clientStartLastServerFails(CacheAtomicityMode atomicityMode) throws Exception { + startGrids(3); + + CacheConfiguration<Object, Object> cfg = cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1); + + cfg.setNodeFilter(new TestNodeFilter(getTestIgniteInstanceName(1))); + + Ignite srv1 = ignite(1); + + srv1.createCache(cfg); + + client = true; + + final Ignite c = startGrid(3); + + client = false; + + TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name()); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + c.cache(DEFAULT_CACHE_NAME); + + return null; + } + }, "start-cache"); + + U.sleep(1000); + + assertFalse(fut.isDone()); + + stopGrid(1); + + fut.get(); + + final IgniteCache<Object, Object> clientCache = c.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 10; i++) { + final int k = i; + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + clientCache.get(k); + + return null; + } + }, CacheServerNotFoundException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + clientCache.put(k, k); + + return null; + } + }, CacheServerNotFoundException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + clientCache.remove(k); + + return null; + } + }, CacheServerNotFoundException.class, null); + } + + startGrid(1); + + awaitPartitionMapExchange(); + + for (int i = 0; i < 100; i++) { + assertNull(clientCache.get(i)); + + clientCache.put(i, i); + + assertEquals(i, clientCache.get(i)); + } + } + + /** + * @throws Exception If failed. + */ + public void testRebalanceState() throws Exception { + final int SRVS = 3; + + startGrids(SRVS); + + List<String> cacheNames = startCaches(ignite(0), 100); + + client = true; + + Ignite c = startGrid(SRVS); + + assertTrue(c.configuration().isClientMode()); + + awaitPartitionMapExchange(); + + client = false; + + TestRecordingCommunicationSpi.spi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode clusterNode, Message msg) { + return msg instanceof GridDhtPartitionsFullMessage && + ((GridDhtPartitionsFullMessage) msg).exchangeId() == null; + } + }); + + startGrid(SRVS + 1); + + for (String cacheName : cacheNames) + c.cache(cacheName); + + U.sleep(1000); + + for (int i = 0; i < SRVS + 1; i++) { + AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2); + + IgniteKernal node = (IgniteKernal)ignite(i); + + for (String cacheName : cacheNames) { + GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology(); + + assertEquals(topVer, top.topologyVersion()); + + assertFalse(top.rebalanceFinished(topVer)); + } + } + + TestRecordingCommunicationSpi.spi(ignite(0)).stopBlock(); + + for (int i = 0; i < SRVS + 1; i++) { + final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1); + + final IgniteKernal node = (IgniteKernal)ignite(i); + + for (String cacheName : cacheNames) { + final GridDhtPartitionTopology top = node.context().cache().internalCache(cacheName).context().topology(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return top.rebalanceFinished(topVer); + } + }, 5000); + + assertTrue(top.rebalanceFinished(topVer)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testRebalanceStateConcurrentStart() throws Exception { + final int SRVS1 = 3; + final int CLIENTS = 5; + final int SRVS2 = 5; + + startGrids(SRVS1); + + Ignite srv0 = ignite(0); + + final int KEYS = 1000; + + final List<String> cacheNames = startCaches(srv0, KEYS); + + client = true; + + final List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) + clients.add(startGrid(SRVS1 + i)); + + client = false; + + final CyclicBarrier barrier = new CyclicBarrier(clients.size() + SRVS2); + + final AtomicInteger clientIdx = new AtomicInteger(); + + final Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < KEYS; i++) + keys.add(i); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + barrier.await(); + + Ignite client = clients.get(clientIdx.getAndIncrement()); + + for (String cacheName : cacheNames) + client.cache(cacheName); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + for (String cacheName : cacheNames) { + IgniteCache<Object, Object> cache = client.cache(cacheName); + + Map<Object, Object> map0 = cache.getAll(keys); + + assertEquals(KEYS, map0.size()); + + cache.put(rnd.nextInt(KEYS), i); + } + } + + return null; + } + }, clients.size(), "client-cache-start"); + + final AtomicInteger srvIdx = new AtomicInteger(SRVS1 + CLIENTS); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + barrier.await(); + + startGrid(srvIdx.incrementAndGet()); + + return null; + } + }, SRVS2, "node-start"); + + fut1.get(); + fut2.get(); + + final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS1 + SRVS2 + CLIENTS, 1); + + for (Ignite client : clients) { + for (String cacheName : cacheNames) { + final GridDhtPartitionTopology top = + ((IgniteKernal)client).context().cache().internalCache(cacheName).context().topology(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return top.rebalanceFinished(topVer); + } + }, 5000); + + assertTrue(top.rebalanceFinished(topVer)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testClientStartCloseServersRestart() throws Exception { + final int SRVS = 4; + final int CLIENTS = 4; + + startGrids(SRVS); + + final List<String> cacheNames = startCaches(ignite(0), 1000); + + client = true; + + final List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) + clients.add(startGrid(SRVS + i)); + + client = false; + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int nodeIdx = rnd.nextInt(SRVS); + + stopGrid(nodeIdx); + + U.sleep(rnd.nextLong(200) + 1); + + startGrid(nodeIdx); + + U.sleep(rnd.nextLong(200) + 1); + } + + return null; + } + }, "restart"); + + final AtomicInteger clientIdx = new AtomicInteger(); + + IgniteInternalFuture<?> clientsFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite client = clients.get(clientIdx.getAndIncrement()); + + assertTrue(client.configuration().isClientMode()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + for (String cacheName : cacheNames) + client.cache(cacheName); + + for (String cacheName : cacheNames) { + IgniteCache<Object, Object> cache = client.cache(cacheName); + + cache.put(rnd.nextInt(1000), rnd.nextInt()); + + cache.get(rnd.nextInt(1000)); + } + + for (String cacheName : cacheNames) { + IgniteCache<Object, Object> cache = client.cache(cacheName); + + cache.close(); + } + } + + return null; + } + }, CLIENTS, "client-thread"); + + try { + U.sleep(10_000); + + stop.set(true); + + restartFut.get(); + clientsFut.get(); + } + finally { + stop.set(true); + } + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (Ignite client : clients) { + for (String cacheName : cacheNames) { + IgniteCache<Object, Object> cache = client.cache(cacheName); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(1000); + + cache.put(key, i); + + assertEquals(i, cache.get(key)); + } + } + } + } + + /** + * @param node Node. + * @param keys Number of keys to put in caches. + * @return Cache names. + */ + private List<String> startCaches(Ignite node, int keys) { + List<String> cacheNames = new ArrayList<>(); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < keys; i++) + map.put(i, i); + + for (int i = 0; i < 3; i++) { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration("atomic-" + i, ATOMIC, i); + + IgniteCache<Object, Object> cache = node.createCache(ccfg); + + cacheNames.add(ccfg.getName()); + + cache.putAll(map); + } + + for (int i = 0; i < 3; i++) { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration("tx-" + i, TRANSACTIONAL, i); + + IgniteCache<Object, Object> cache = node.createCache(ccfg); + + cacheNames.add(ccfg.getName()); + + cache.putAll(map); + } + + return cacheNames; + } + + /** + * @param name Cache name. + * @param atomicityMode Cache atomicity mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(backups); + + return ccfg; + } + /** + * + */ + private static class TestNodeFilter implements IgnitePredicate<ClusterNode> { + /** */ + private final String includeName; + + /** + * @param includeName Node to include. + */ + public TestNodeFilter(String includeName) { + this.includeName = includeName; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return includeName.equals(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME)); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java index 9176cbd..0cb0856 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicClientCacheStartSelfTest.java @@ -17,17 +17,24 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import javax.cache.CacheException; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -35,8 +42,11 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; /** @@ -267,6 +277,173 @@ public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTes } /** + * @throws Exception If failed. + */ + public void testStartMultipleClientCaches() throws Exception { + startMultipleClientCaches(null); + } + + /** + * @throws Exception If failed. + */ + public void testStartMultipleClientCachesForGroup() throws Exception { + startMultipleClientCaches("testGrp"); + } + + /** + * @param grp Caches group name. + * @throws Exception If failed. + */ + private void startMultipleClientCaches(@Nullable String grp) throws Exception { + final int SRVS = 1; + + Ignite srv = startGrids(SRVS); + + client = true; + + Ignite client = startGrid(SRVS); + + for (CacheAtomicityMode atomicityMode : values()) { + for (boolean batch : new boolean[]{false, true}) + startCachesForGroup(srv, client, grp, atomicityMode, batch); + } + } + + /** + * @param srv Server node. + * @param client Client node. + * @param grp Cache group. + * @param atomicityMode Cache atomicity mode. + * @param batch {@code True} if use {@link Ignite#getOrCreateCaches(Collection)} for cache creation. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void startCachesForGroup(Ignite srv, + Ignite client, + @Nullable String grp, + CacheAtomicityMode atomicityMode, + boolean batch) throws Exception { + log.info("Start caches [grp=" + grp + ", atomicity=" + atomicityMode + ", batch=" + batch + ']'); + + try { + srv.createCaches(cacheConfigurations(grp, atomicityMode)); + + Collection<IgniteCache> caches; + + if (batch) + caches = client.getOrCreateCaches(cacheConfigurations(grp, atomicityMode)); + else { + caches = new ArrayList<>(); + + for (CacheConfiguration ccfg : cacheConfigurations(grp, atomicityMode)) + caches.add(client.getOrCreateCache(ccfg)); + } + + for (IgniteCache cache : caches) + checkCache(client, cache.getName(), false, false); + + Map<Integer, Integer> map1 = new HashMap<>(); + Map<Integer, Integer> map2 = new HashMap<>(); + + for (int i = 0; i < 100; i++) { + map1.put(i, i); + map2.put(i, i + 1); + } + + for (IgniteCache<Integer, Integer> cache : caches) { + for (Map.Entry<Integer, Integer> e : map1.entrySet()) + cache.put(e.getKey(), e.getValue()); + + checkCacheData(map1, cache.getName()); + + cache.putAll(map2); + + checkCacheData(map2, cache.getName()); + } + + for (IgniteCache<Integer, Integer> cache : caches) { + cache.close(); + + checkNoCache(client, cache.getName()); + } + } + finally { + for (CacheConfiguration ccfg : cacheConfigurations(grp, atomicityMode)) + srv.destroyCache(ccfg.getName()); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testStartNewAndClientCaches() throws Exception { + final int SRVS = 4; + + Ignite srv = startGrids(SRVS); + + srv.createCaches(cacheConfigurations(null, ATOMIC)); + + ccfg = null; + + client = true; + + Ignite client = startGrid(SRVS); + + List<CacheConfiguration> cfgs = new ArrayList<>(); + + cfgs.addAll(cacheConfigurations(null, ATOMIC)); + cfgs.addAll(cacheConfigurations(null, TRANSACTIONAL)); + + assertEquals(6, cfgs.size()); + + Collection<IgniteCache> caches = client.getOrCreateCaches(cfgs); + + assertEquals(cfgs.size(), caches.size()); + + for (CacheConfiguration cfg : cfgs) + checkCache(client, cfg.getName(), false, false); + + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + for (IgniteCache<Object, Object> cache : caches) { + cache.putAll(map); + + checkCacheData(map, cache.getName()); + } + + for (IgniteCache cache : caches) { + cache.close(); + + checkNoCache(client, cache.getName()); + } + } + + /** + * @param grp Group name. + * @param atomicityMode Atomicity mode. + * @return Cache configurations. + */ + private List<CacheConfiguration> cacheConfigurations(@Nullable String grp, CacheAtomicityMode atomicityMode) { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setGroupName(grp); + ccfg.setName("cache-" + atomicityMode + "-" + i); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + ccfgs.add(ccfg); + } + + return ccfgs; + } + + /** * @param createFromCacheClient If {@code true} creates cache from cache client node. * @throws Exception If failed. */ @@ -291,6 +468,8 @@ public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTes ignite0.cache(DEFAULT_CACHE_NAME).close(); + checkNoCache(ignite0, DEFAULT_CACHE_NAME); + assertNotNull(ignite0.cache(DEFAULT_CACHE_NAME)); startGrid(2); @@ -303,27 +482,37 @@ public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTes * @param cacheName Cache name * @param srv {@code True} if server cache is expected. * @param near {@code True} if near cache is expected. + * @throws Exception If failed. */ - private void checkCache(Ignite ignite, String cacheName, boolean srv, boolean near) { + private void checkCache(Ignite ignite, final String cacheName, boolean srv, boolean near) throws Exception { GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); assertNotNull("No cache on node " + ignite.name(), cache); assertEquals(near, cache.context().isNear()); - ClusterNode node = ((IgniteKernal)ignite).localNode(); + final ClusterNode node = ((IgniteKernal)ignite).localNode(); for (Ignite ignite0 : Ignition.allGrids()) { - GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + final GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + + if (srv || ignite == ignite0) + assertTrue(disco.cacheNode(node, cacheName)); + else { + assertTrue(ignite0.name(), GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return disco.cacheNode(node, cacheName); + } + }, 5000)); + } - assertTrue(disco.cacheNode(node, cacheName)); assertEquals(srv, disco.cacheAffinityNode(node, cacheName)); assertEquals(near, disco.cacheNearNode(node, cacheName)); if (srv) - assertTrue(ignite0.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node).length > 0); + assertTrue(ignite0.affinity(cacheName).primaryPartitions(node).length > 0); else - assertEquals(0, ignite0.affinity(DEFAULT_CACHE_NAME).primaryPartitions(node).length); + assertEquals(0, ignite0.affinity(cacheName).primaryPartitions(node).length); } assertNotNull(ignite.cache(cacheName)); @@ -332,24 +521,63 @@ public class IgniteDynamicClientCacheStartSelfTest extends GridCommonAbstractTes /** * @param ignite Node. * @param cacheName Cache name. + * @throws Exception If failed. */ - private void checkNoCache(Ignite ignite, String cacheName) { + private void checkNoCache(Ignite ignite, final String cacheName) throws Exception { GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); assertNull("Unexpected cache on node " + ignite.name(), cache); - ClusterNode node = ((IgniteKernal)ignite).localNode(); + final ClusterNode node = ((IgniteKernal)ignite).localNode(); for (Ignite ignite0 : Ignition.allGrids()) { - GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + final GridDiscoveryManager disco = ((IgniteKernal)ignite0).context().discovery(); + + if (ignite0 == ignite) + assertFalse(ignite0.name(), disco.cacheNode(node, cacheName)); + else { + assertTrue(ignite0.name(), GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !disco.cacheNode(node, cacheName); + } + }, 5000)); + } - assertFalse(disco.cacheNode(node, cacheName)); assertFalse(disco.cacheAffinityNode(node, cacheName)); assertFalse(disco.cacheNearNode(node, cacheName)); } } /** + * @throws Exception If failed. + */ + public void testStartClientCachesOnCoordinatorWithGroup() throws Exception { + startGrids(3); + + List<CacheConfiguration> ccfgs = cacheConfigurations("testGrp", ATOMIC); + + for (CacheConfiguration ccfg : ccfgs) + ccfg.setNodeFilter(new CachePredicate(F.asList(getTestIgniteInstanceName(0)))); + + ignite(1).createCaches(ccfgs); + + ccfgs = cacheConfigurations("testGrp", ATOMIC); + + for (CacheConfiguration ccfg : ccfgs) + ccfg.setNodeFilter(new CachePredicate(F.asList(getTestIgniteInstanceName(0)))); + + for (IgniteCache<Object, Object> cache : ignite(0).getOrCreateCaches(ccfgs)) { + cache.put(1, 1); + + assertEquals(1, cache.get(1)); + + cache.close(); + } + + startGrid(4); + } + + /** * */ static class CachePredicate implements IgnitePredicate<ClusterNode> { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteNearClientCacheCloseTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteNearClientCacheCloseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteNearClientCacheCloseTest.java new file mode 100644 index 0000000..58fb512 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteNearClientCacheCloseTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteNearClientCacheCloseTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheCloseAtomic1() throws Exception { + nearCacheClose(1, false, ATOMIC); + + nearCacheClose(1, true, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheCloseAtomic2() throws Exception { + nearCacheClose(4, false, ATOMIC); + + nearCacheClose(4, true, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheCloseTx1() throws Exception { + nearCacheClose(1, false, TRANSACTIONAL); + + nearCacheClose(1, true, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testNearCacheCloseTx2() throws Exception { + nearCacheClose(4, false, TRANSACTIONAL); + + nearCacheClose(4, true, TRANSACTIONAL); + } + + /** + * @param srvs Number of server nodes. + * @param srvNearCache {@code True} to enable near cache on server nodes. + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void nearCacheClose(int srvs, boolean srvNearCache, CacheAtomicityMode atomicityMode) throws Exception { + Ignite srv; + + if (Ignition.allGrids().isEmpty()) { + srv = startGrids(srvs); + + client = true; + + startGrid(srvs); + } + else + srv = grid(0); + + IgniteCache<Object, Object> srvCache = srv.createCache(cacheConfiguration(atomicityMode, srvNearCache)); + + List<Integer> keys = new ArrayList<>(); + + keys.add(primaryKey(srvCache)); + + if (srvs > 1) { + keys.add(backupKey(srvCache)); + keys.add(nearKey(srvCache)); + } + + for (Integer key : keys) { + IgniteCache<Object, Object> clientCache = + ignite(srvs).createNearCache(DEFAULT_CACHE_NAME, new NearCacheConfiguration<>()); + + clientCache.put(key, 1); + + clientCache.close(); + + srvCache.put(key, 2); + + assertEquals(2, srvCache.get(key)); + + srvCache.put(key, 3); + + assertEquals(3, srvCache.get(key)); + } + + srvCache.destroy(); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateAndNearCacheClose() throws Exception { + final int SRVS = 4; + + startGrids(SRVS); + + client = true; + + startGrid(SRVS); + + startGrid(SRVS + 1); + + concurrentUpdateAndNearCacheClose(ATOMIC, SRVS + 1); + + concurrentUpdateAndNearCacheClose(TRANSACTIONAL, SRVS + 1); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param nearClient Index of client node with near cache. + * @throws Exception If failed. + */ + private void concurrentUpdateAndNearCacheClose(CacheAtomicityMode atomicityMode, + final int nearClient) + throws Exception + { + final String cacheName = ignite(0).createCache(cacheConfiguration(atomicityMode, false)).getName(); + + for (int iter = 0; iter < 5; iter++) { + log.info("Iteration: " + iter); + + IgniteCache<Object, Object> nearCache = ignite(nearClient).createNearCache(cacheName, + new NearCacheConfiguration<>()); + + final int KEYS = 1000; + + for (int i = 0; i < KEYS; i++) + nearCache.put(i, i); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + int node = rnd.nextInt(nearClient); + + IgniteCache<Object, Object> cache = ignite(node).cache(cacheName); + + if (rnd.nextBoolean()) { + Map<Integer, Integer> map = new TreeMap<>(); + + for (int i = 0; i < 10; i++) + map.put(rnd.nextInt(KEYS), i); + + cache.putAll(map); + } + else + cache.put(rnd.nextInt(KEYS), node); + } + + return null; + } + }, 10, "update"); + + try { + U.sleep(3000); + + nearCache.close(); + + stop.set(true); + + updateFut.get(); + } + finally { + stop.set(true); + } + } + + ignite(0).destroyCache(cacheName); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param nearCache {@code True} to enable near cache. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode, boolean nearCache) { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(1); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java index f94babe..9860199 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorAbstractCacheTest.java @@ -19,13 +19,18 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; import java.util.Collection; +import java.util.Collections; +import java.util.List; import javax.cache.CacheException; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.transactions.Transaction; /** @@ -33,49 +38,71 @@ import org.apache.ignite.transactions.Transaction; */ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCacheAbstractTest implements Serializable { /** key-value used at test. */ - protected static String KEY_VAL = "1"; + private static String KEY_VAL = "1"; /** cache name 1. */ - protected static String CACHE_NAME_1 = "cache1"; + static String CACHE_NAME_1 = "cache1"; /** cache name 2. */ protected static String CACHE_NAME_2 = "cache2"; + /** */ + private boolean client; + /** {@inheritDoc} */ - @Override protected int gridCount() { + @Override protected final int gridCount() { return 1; } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName); - - CacheConfiguration cCfg0 = cacheConfiguration(igniteInstanceName); - - CacheConfiguration cCfg1 = cacheConfiguration(igniteInstanceName); - cCfg1.setName(CACHE_NAME_1); - - CacheConfiguration cCfg2 = cacheConfiguration(igniteInstanceName); - cCfg2.setName(CACHE_NAME_2); - - iCfg.setCacheConfiguration(cCfg0, cCfg1, cCfg2); - - for (CacheConfiguration cCfg : iCfg.getCacheConfiguration()) { - if (cCfg.getName().equals(CACHE_NAME_1)) - cCfg.setTopologyValidator(new TopologyValidator() { - @Override public boolean validate(Collection<ClusterNode> nodes) { - return nodes.size() == 2; - } - }); - else if (cCfg.getName().equals(CACHE_NAME_2)) - cCfg.setTopologyValidator(new TopologyValidator() { - @Override public boolean validate(Collection<ClusterNode> nodes) { - return nodes.size() >= 2; - } - }); + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (!client) { + CacheConfiguration cCfg0 = cacheConfiguration(igniteInstanceName); + + CacheConfiguration cCfg1 = cacheConfiguration(igniteInstanceName); + cCfg1.setName(CACHE_NAME_1); + + CacheConfiguration cCfg2 = cacheConfiguration(igniteInstanceName); + cCfg2.setName(CACHE_NAME_2); + + cfg.setCacheConfiguration(cCfg0, cCfg1, cCfg2); + + for (CacheConfiguration cCfg : cfg.getCacheConfiguration()) { + if (cCfg.getName().equals(CACHE_NAME_1)) + cCfg.setTopologyValidator(new TopologyValidator() { + @Override public boolean validate(Collection<ClusterNode> nodes) { + return servers(nodes) == 2; + } + }); + else if (cCfg.getName().equals(CACHE_NAME_2)) + cCfg.setTopologyValidator(new TopologyValidator() { + @Override public boolean validate(Collection<ClusterNode> nodes) { + return servers(nodes) >= 2; + } + }); + } + } + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param nodes Nodes. + * @return Number of server nodes. + */ + private static int servers(Collection<ClusterNode> nodes) { + int c = 0; + + for (ClusterNode node : nodes) { + if (!CU.clientNode(node)) + c++; } - return iCfg; + return c; } /** @@ -83,11 +110,16 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * * @param cacheName cache name. */ - protected void putInvalid(String cacheName) { + void putInvalid(String cacheName) { try { - grid(0).cache(cacheName).put(KEY_VAL, KEY_VAL); + List<Ignite> nodes = nodes(); - assert false : "topology validation broken"; + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) + node.cache(cacheName).put(KEY_VAL, KEY_VAL); + + fail("Topology validation broken"); } catch (CacheException ex) { assert ex.getCause() instanceof IgniteCheckedException && @@ -100,11 +132,17 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * * @param cacheName cache name. */ - protected void putValid(String cacheName) { + void putValid(String cacheName) { try { - grid(0).cache(cacheName).put(KEY_VAL, KEY_VAL); + List<Ignite> nodes = nodes(); - assert grid(0).cache(cacheName).get(KEY_VAL).equals(KEY_VAL); + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) + node.cache(cacheName).put(KEY_VAL, KEY_VAL); + + for (Ignite node : nodes) + assertEquals(KEY_VAL, node.cache(cacheName).get(KEY_VAL)); } catch (CacheException ignored) { assert false : "topology validation broken"; @@ -116,13 +154,13 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * * @param cacheName cache name. */ - protected void getInvalid(String cacheName) { - try { - assert grid(0).cache(cacheName).get(KEY_VAL).equals(KEY_VAL); - } - catch (CacheException ignored) { - assert false : "topology validation broken"; - } + void getInvalid(String cacheName) { + List<Ignite> nodes = nodes(); + + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) + assertEquals(KEY_VAL, node.cache(cacheName).get(KEY_VAL)); } /** @@ -130,11 +168,16 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * * @param cacheName cache name. */ - protected void removeInvalid(String cacheName) { + void removeInvalid(String cacheName) { try { - grid(0).cache(cacheName).remove(KEY_VAL); + List<Ignite> nodes = nodes(); - assert false : "topology validation broken"; + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) + node.cache(cacheName).remove(KEY_VAL); + + fail("Topology validation broken"); } catch (CacheException ex) { assert ex.getCause() instanceof IgniteCheckedException && @@ -143,13 +186,26 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac } /** + * @return Nodes. + */ + private List<Ignite> nodes() { + if (this instanceof IgniteTopologyValidatorAbstractTxCacheTest || + this instanceof IgniteTopologyValidatorAbstractTxCacheGroupsTest) + return Collections.singletonList(ignite(0)); + else + return G.allGrids(); + } + + /** * Commits with error. * * @param tx transaction. */ - protected void commitFailed(Transaction tx) { + void commitFailed(Transaction tx) { try { tx.commit(); + + fail(); } catch (IgniteException ex) { assert ex.getCause() instanceof IgniteCheckedException && @@ -163,7 +219,12 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * @param cacheName cache name. */ public void remove(String cacheName) { - assert grid(0).cache(cacheName).get(KEY_VAL) != null; + List<Ignite> nodes = G.allGrids(); + + assertFalse(nodes.isEmpty()); + + for (Ignite node : nodes) + assertNotNull(node.cache(cacheName).get(KEY_VAL)); grid(0).cache(cacheName).remove(KEY_VAL); } @@ -173,13 +234,14 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac * * @param cacheName cache name. */ - public void assertEmpty(String cacheName) { - assert grid(0).cache(cacheName).get(KEY_VAL) == null; + void assertEmpty(String cacheName) { + assertNull(grid(0).cache(cacheName).get(KEY_VAL)); } - /** topology validator test. */ + /** + * @throws Exception If failed. + */ public void testTopologyValidator() throws Exception { - putValid(DEFAULT_CACHE_NAME); remove(DEFAULT_CACHE_NAME); @@ -210,5 +272,19 @@ public abstract class IgniteTopologyValidatorAbstractCacheTest extends IgniteCac putValid(CACHE_NAME_2); remove(CACHE_NAME_2); + + client = true; + + startGrid(3); + + putValid(DEFAULT_CACHE_NAME); + remove(DEFAULT_CACHE_NAME); + + getInvalid(CACHE_NAME_1); + putInvalid(CACHE_NAME_1); + removeInvalid(CACHE_NAME_1); + + putValid(CACHE_NAME_2); + remove(CACHE_NAME_2); } }
