http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java new file mode 100644 index 0000000..bbc9144 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} in client mode. + */ +@SuppressWarnings("RedundantMethodOverride") +public abstract class GridCacheClientModesTcpClientDiscoveryAbstractTest extends GridCacheClientModesAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean isClientStartedLast() { + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(false); + + return cfg; + } + + /** */ + public static class CaseNearReplicatedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseNearReplicatedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseNearPartitionedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseNearPartitionedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseClientReplicatedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseClientReplicatedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseClientPartitionedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseClientPartitionedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java index e19442f..a3c977f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.testframework.junits.common.*; /** @@ -31,6 +32,8 @@ public class GridCacheMixedModeSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + cfg.setCacheConfiguration(cacheConfiguration(gridName)); if (F.eq(gridName, getTestGridName(0))) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java new file mode 100644 index 0000000..6782ff4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -0,0 +1,1803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.eclipse.jetty.util.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheConfiguration ccfg; + + /** */ + private boolean client; + + /** */ + private volatile CyclicBarrier updateBarrier; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true); + + cfg.setClientMode(client); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllClockMode() throws Exception { + atomicPut(CLOCK, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllPrimaryMode() throws Exception { + atomicPut(PRIMARY, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledClockMode() throws Exception { + atomicPut(CLOCK, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledPrimaryMode() throws Exception { + atomicPut(PRIMARY, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutClockMode() throws Exception { + atomicPut(CLOCK, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutPrimaryMode() throws Exception { + atomicPut(PRIMARY, false, null); + } + + /** + * @param writeOrder Write order. + * @param putAll If {@code true} executes putAll. + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void atomicPut(CacheAtomicWriteOrderMode writeOrder, + final boolean putAll, + @Nullable NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + ccfg.setNearConfiguration(nearCfg); + + client = true; + + ccfg.setNearConfiguration(null); + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + final int KEYS = putAll ? 100 : 1; + + for (int i = 0; i < KEYS; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 0); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + ignite3.close(); + + map.clear(); + + for (int i = 0; i < KEYS; i++) + map.put(i, i + 1); + + // Block messages requests for single node. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 1); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + for (int i = 0; i < KEYS; i++) + map.put(i, i + 2); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 2); + + checkData(map, null, cache, 4); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapClockMode() throws Exception { + atomicNoRemap(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapPrimaryMode() throws Exception { + atomicNoRemap(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicNoRemap(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(primaryKey(ignite0.cache(null)), 0); + map.put(primaryKey(ignite1.cache(null)), 1); + map.put(primaryKey(ignite2.cache(null)), 2); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id()); + + spi.record(GridNearAtomicUpdateRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + cache.putAll(map); + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest()); + + map.put(primaryKey(ignite0.cache(null)), 3); + map.put(primaryKey(ignite1.cache(null)), 4); + map.put(primaryKey(ignite2.cache(null)), 5); + + cache.putAll(map); + + checkData(map, null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGetAndPutClockMode() throws Exception { + atomicGetAndPut(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGetAndPutPrimaryMode() throws Exception { + atomicGetAndPut(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicGetAndPut(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + ignite0.cache(null).put(0, 0); + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(0, 1); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<Integer> putFut = GridTestUtils.runAsync(new Callable<Integer>() { + @Override public Integer call() throws Exception { + Thread.currentThread().setName("put-thread"); + + return cache.getAndPut(0, 1); + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block."); + + spi.stopBlock(); + + Integer old = putFut.get(); + + checkData(map, null, cache, 4); + + assertEquals((Object)0, old); + } + + /** + * @throws Exception If failed. + */ + public void testTxPutAll() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + cache.putAll(map); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + map.clear(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + cache.putAll(map); + + checkData(map, null, cache, 4); + } + /** + * @throws Exception If failed. + */ + public void testPessimisticTx() throws Exception { + pessimisticTx(null); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNearEnabled() throws Exception { + pessimisticTx(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void pessimisticTx(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + spi.record(GridNearLockRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 4); + + List<Object> msgs = spi.recordedMessages(); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + assertTrue(((GridNearLockRequest)msgs.get(1)).firstClientRequest()); + + for (int i = 2; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); + + ignite3.close(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + ignite3 = startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + for (int i = 0; i < 100; i++) + map.put(i, i + 2); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 4); + } + + /** + * Tests specific scenario when mapping for first locked keys does not change, but changes for second one. + * + * @throws Exception If failed. + */ + public void testPessimisticTx2() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + AffinityTopologyVersion topVer1 = new AffinityTopologyVersion(4, 0); + + assertEquals(topVer1, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Integer key1 = 0; + final Integer key2 = 7; + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } + + return null; + } + }); + + client = false; + + IgniteEx ignite4 = startGrid(4); + + AffinityTopologyVersion topVer2 = new AffinityTopologyVersion(5, 0); + + assertEquals(topVer2, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + GridCacheAffinityManager aff = ignite0.context().cache().internalCache(null).context().affinity(); + + List<ClusterNode> nodes1 = aff.nodes(key1, topVer1); + List<ClusterNode> nodes2 = aff.nodes(key1, topVer2); + + assertEquals(nodes1, nodes2); + + nodes1 = aff.nodes(key2, topVer1); + nodes2 = aff.nodes(key2, topVer2); + + assertFalse(nodes1.get(0).equals(nodes2.get(0))); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(F.asMap(key1, 1, key2, 2), null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNearEnabledNoRemap() throws Exception { + pessimisticTxNoRemap(new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNoRemap() throws Exception { + pessimisticTxNoRemap(null); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void pessimisticTxNoRemap(@Nullable NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + spi.record(GridNearLockRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + checkClientLockMessages(msgs, map.size()); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testLock() throws Exception { + lock(null); + } + + /** + * @throws Exception If failed. + */ + public void testLockNearEnabled() throws Exception { + lock(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void lock(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + final IgniteEx ignite0 = startGrid(0); + final IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + final CountDownLatch lockedLatch = new CountDownLatch(1); + + final CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture<Lock> lockFut = GridTestUtils.runAsync(new Callable<Lock>() { + @Override public Lock call() throws Exception { + Thread.currentThread().setName("put-thread"); + + Lock lock = cache.lockAll(keys); + + lock.lock(); + + log.info("Locked"); + + lockedLatch.countDown(); + + unlockLatch.await(); + + lock.unlock(); + + return lock; + } + }); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); + + assertEquals(1, lockedLatch.getCount()); + + spi.stopBlock(); + + assertTrue(lockedLatch.await(3000, TimeUnit.MILLISECONDS)); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertFalse(lock.tryLock()); + } + + unlockLatch.countDown(); + + lockFut.get(); + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return unlocked(ignite0) && unlocked(ignite1); + } + + private boolean unlocked(Ignite ignite) { + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + for (Integer key : keys) { + if (cache.isLocalLocked(key, false)) { + log.info("Key is locked [key=" + key + ", node=" + ignite.name() + ']'); + + return false; + } + } + + return true; + } + }, 10_000); + + assertTrue(wait); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertTrue("Failed to lock: " + key, lock.tryLock()); + + lock.unlock(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + spi.record(GridNearLockRequest.class); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 3); + + Map<Integer, Integer> map = new HashMap<>(); + + map.put(4, 4); + map.put(5, 5); + map.put(6, 6); + map.put(7, 7); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 4); + + spi.record(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearLockRequest.class); + + List<Integer> keys = primaryKeys(ignite1.cache(null), 3, 0); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.put(keys.get(0), 0); + cache0.put(keys.get(1), 1); + cache0.put(keys.get(2), 2); + + tx.commit(); + } + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearLockRequest)msg).firstClientRequest()); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientLockMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + List<Integer> keys0 = primaryKeys(ignite0.cache(null), 2, 0); + List<Integer> keys1 = primaryKeys(ignite1.cache(null), 2, 0); + List<Integer> keys2 = primaryKeys(ignite2.cache(null), 2, 0); + + LinkedHashMap<Integer, Integer> map = new LinkedHashMap<>(); + + map.put(keys0.get(0), 1); + map.put(keys1.get(0), 2); + map.put(keys2.get(0), 3); + map.put(keys0.get(1), 4); + map.put(keys1.get(1), 5); + map.put(keys2.get(1), 6); + + spi.record(GridNearTxPrepareRequest.class); + + try (Transaction tx = ignite3.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + checkData(map, null, cache, 4); + + cache.putAll(map); + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + spi.record(null); + + checkData(map, null, cache, 4); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearTxPrepareRequest.class); + + cache0.putAll(map); + + spi0.record(null); + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(4, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest()); + + checkData(map, null, cache, 4); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientPrepareMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearTxPrepareRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearTxPrepareRequest) msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ + public void testLockRemoveAfterClientFailed() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + IgniteCache<Integer, Integer> cache2 = ignite2.cache(null); + + final Integer key = 0; + + Lock lock2 = cache2.lock(key); + + lock2.lock(); + + ignite2.close(); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + assertFalse(cache0.isLocalLocked(key, false)); + + IgniteCache<Integer, Integer> cache1 = ignite1.cache(null); + + assertFalse(cache1.isLocalLocked(key, false)); + + Lock lock1 = cache1.lock(0); + + assertTrue(lock1.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock1.unlock(); + + ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + cache2 = ignite2.cache(null); + + lock2 = cache2.lock(0); + + assertTrue(lock2.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock2.unlock(); + } + + /** + * @throws Exception If failed. + */ + public void testLockFromClientBlocksExchange() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + startGrid(0); + startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + Lock lock = cache.lock(0); + + lock.lock(); + + IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + client = false; + + startGrid(3); + + return null; + } + }); + + U.sleep(2000); + + assertFalse(startFut.isDone()); + + AffinityTopologyVersion ver = new AffinityTopologyVersion(4); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + U.sleep(2000); + + for (int i = 0; i < 3; i++) { + Ignite ignite = ignite(i); + + IgniteInternalFuture<?> fut = + ((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(ver); + + assertNotNull(fut); + + assertFalse(fut.isDone()); + + futs.add(fut); + } + + lock.unlock(); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(10_000); + + startFut.get(10_000); + } + + /** + * @param map Expected data. + * @param keys Expected keys (if expected data is not specified). + * @param clientCache Client cache. + * @param expNodes Expected nodes number. + * @throws Exception If failed. + */ + private void checkData(final Map<Integer, Integer> map, + final Set<Integer> keys, + IgniteCache<?, ?> clientCache, + final int expNodes) + throws Exception + { + final List<Ignite> nodes = G.allGrids(); + + final Affinity<Integer> aff = nodes.get(0).affinity(null); + + assertEquals(expNodes, nodes.size()); + + boolean hasNearCache = clientCache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null; + + final Ignite nearCacheNode = hasNearCache ? clientCache.unwrap(Ignite.class) : null; + + boolean wait = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + try { + Set<Integer> keys0 = map != null ? map.keySet() : keys; + + assertNotNull(keys0); + + for (Integer key : keys0) { + GridCacheVersion ver = null; + Object val = null; + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(null); + + boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key); + + Object val0 = cache.localPeek(key); + + if (affNode || node == nearCacheNode) { + if (map != null) + assertEquals("Unexpected value for " + node.name(), map.get(key), val0); + else + assertNotNull("Unexpected value for " + node.name(), val0); + + GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null); + + if (affNode && cache0.isNear()) + cache0 = ((GridNearCacheAdapter)cache0).dht(); + + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("No entry [node=" + node.name() + ", key=" + key + ']', entry); + + GridCacheVersion ver0 = entry instanceof GridNearCacheEntry ? + ((GridNearCacheEntry)entry).dhtVersion() : entry.version(); + + assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0); + + if (ver == null) { + ver = ver0; + val = val0; + } + else { + assertEquals("Version check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + ver0, + ver); + + assertEquals("Value check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + val0, + val); + } + } + else + assertNull("Unexpected non-null value for " + node.name(), val0); + } + } + } + catch (AssertionError e) { + log.info("Check failed, will retry: " + e); + + return false; + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + + return true; + } + }, 10_000); + + assertTrue("Data check failed.", wait); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPrimaryPutAllMultinode() throws Exception { + multinode(PRIMARY, TestType.PUT_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClockPutAllMultinode() throws Exception { + multinode(CLOCK, TestType.PUT_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticTxPutAllMultinode() throws Exception { + multinode(null, TestType.OPTIMISTIC_TX); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxPutAllMultinode() throws Exception { + multinode(null, TestType.PESSIMISTIC_TX); + } + + /** + * @throws Exception If failed. + */ + public void testLockAllMultinode() throws Exception { + multinode(null, TestType.LOCK); + } + + /** + * @param atomicWriteOrder Write order if test atomic cache. + * @param testType Test type. + * @throws Exception If failed. + */ + private void multinode(final CacheAtomicWriteOrderMode atomicWriteOrder, final TestType testType) + throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(atomicWriteOrder != null ? ATOMIC : TRANSACTIONAL); + ccfg.setAtomicWriteOrderMode(atomicWriteOrder); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + final int SRV_CNT = 4; + + for (int i = 0; i < SRV_CNT; i++) + startGrid(i); + + final int CLIENT_CNT = 4; + + final List<Ignite> clients = new ArrayList<>(); + + client = true; + + for (int i = 0; i < CLIENT_CNT; i++) { + Ignite ignite = startGrid(SRV_CNT + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicInteger threadIdx = new AtomicInteger(0); + + final int THREADS = CLIENT_CNT * 3; + + final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut; + + try { + fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int clientIdx = threadIdx.getAndIncrement() % CLIENT_CNT; + + Ignite ignite = clients.get(clientIdx); + + assertTrue(ignite.configuration().isClientMode()); + + Thread.currentThread().setName("update-thread-" + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + boolean useTx = testType == TestType.OPTIMISTIC_TX || testType == TestType.PESSIMISTIC_TX; + + if (useTx || testType == TestType.LOCK) { + assertEquals(TRANSACTIONAL, + cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + } + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cntr = 0; + + while (!stop.get()) { + TreeMap<Integer, Integer> map = new TreeMap<>(); + + for (int i = 0; i < 100; i++) { + Integer key = rnd.nextInt(0, 1000); + + map.put(key, rnd.nextInt()); + } + + try { + if (testType == TestType.LOCK) { + Lock lock = cache.lockAll(map.keySet()); + + lock.lock(); + + lock.unlock(); + } + else { + if (useTx) { + IgniteTransactions txs = ignite.transactions(); + + TransactionConcurrency concurrency = + testType == TestType.PESSIMISTIC_TX ? PESSIMISTIC : OPTIMISTIC; + + try (Transaction tx = txs.txStart(concurrency, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + } + else + cache.putAll(map); + + putKeys.addAll(map.keySet()); + } + } + catch (CacheException | IgniteException e) { + log.info("Operation failed, ignore: " + e); + } + + if (++cntr % 100 == 0) + log.info("Iteration: " + cntr); + + if (updateBarrier != null) + updateBarrier.await(); + } + + return null; + } + }, THREADS, "update-thread"); + + long stopTime = System.currentTimeMillis() + 60_000; + + while (System.currentTimeMillis() < stopTime) { + boolean restartClient = ThreadLocalRandom.current().nextBoolean(); + + Integer idx = null; + + if (restartClient) { + log.info("Start client node."); + + client = true; + + IgniteEx ignite = startGrid(SRV_CNT + CLIENT_CNT); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + assertNotNull(cache); + } + else { + idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT); + + log.info("Stop server node: " + idx); + + stopGrid(idx); + } + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.error("Failed to wait for update."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + + if (restartClient) { + log.info("Stop client node."); + + stopGrid(SRV_CNT + CLIENT_CNT); + } + else { + log.info("Start server node: " + idx); + + client = false; + + startGrid(idx); + } + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.error("Failed to wait for update."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + } + } + finally { + stop.set(true); + } + + fut.get(30_000); + + if (testType != TestType.LOCK) + checkData(null, putKeys, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT); + } + + /** + * @throws Exception If failed. + */ + public void testServersLeaveOnStart() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + Ignite ignite0 = startGrid(0); + + client = true; + + final AtomicInteger nodeIdx = new AtomicInteger(2); + + final int CLIENTS = 10; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = nodeIdx.getAndIncrement(); + + startGrid(idx); + + return null; + } + }, CLIENTS, "start-client"); + + ignite0.close(); + + fut.get(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = grid(i + 2); + + assertEquals(CLIENTS, ignite.cluster().nodes().size()); + } + + client = false; + + startGrid(0); + startGrid(1); + + awaitPartitionMapExchange(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = grid(i + 2); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + cache.put(i, i); + + assertEquals((Object)i, cache.get(i)); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + + /** */ + private Class<?> recordCls; + + /** */ + private List<Object> recordedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param recordCls Message class to record. + */ + void record(@Nullable Class<?> recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + List<Object> recordedMessages() { + synchronized (this) { + List<Object> msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class<?> cls, UUID nodeId) { + synchronized (this) { + Set<UUID> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * + */ + void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + } + } + + /** + * + */ + enum TestType { + /** */ + PUT_ALL, + + /** */ + OPTIMISTIC_TX, + + /** */ + PESSIMISTIC_TX, + + /** */ + LOCK + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java new file mode 100644 index 0000000..bd74ece --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheRebalanceMode.*; + +/** + * + */ +public class IgniteCacheClientNodeConcurrentStart extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES_CNT = 5; + + /** */ + private Set<Integer> clientNodes; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assertNotNull(clientNodes); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + boolean client = false; + + for (Integer clientIdx : clientNodes) { + if (getTestGridName(clientIdx).equals(gridName)) { + client = true; + + break; + } + } + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(0); + ccfg.setRebalanceMode(SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStart() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 3; i++) { + try { + clientNodes = new HashSet<>(); + + while (clientNodes.size() < 2) + clientNodes.add(rnd.nextInt(0, NODES_CNT)); + + clientNodes.add(NODES_CNT - 1); + + log.info("Test iteration [iter=" + i + ", clients=" + clientNodes + ']'); + + startGridsMultiThreaded(NODES_CNT, true); + + for (int node : clientNodes) { + Ignite ignite = grid(node); + + assertTrue(ignite.configuration().isClientMode()); + } + } + finally { + stopAllGrids(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java new file mode 100644 index 0000000..e5d30b6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java @@ -0,0 +1,632 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private boolean fairAffinity; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + if (fairAffinity) + ccfg.setAffinity(new FairAffinityFunction()); + + cfg.setCacheConfiguration(ccfg); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeave() throws Exception { + Ignite ignite0 = startGrid(0); + + client = true; + + final Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + final Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + ignite0.close(); + + waitForTopologyUpdate(2, 4); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite1.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite2.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + + ignite1.close(); + + waitForTopologyUpdate(1, 5); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + ignite2.cache(null).get(1); + + return null; + } + }, CacheServerNotFoundException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testSkipPreload() throws Exception { + Ignite ignite0 = startGrid(0); + + final CountDownLatch evtLatch0 = new CountDownLatch(1); + + ignite0.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + log.info("Rebalance event: " + evt); + + evtLatch0.countDown(); + + return true; + } + }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED); + + client = true; + + Ignite ignite1 = startGrid(1); + + assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + ignite1.close(); + + assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + ignite1 = startGrid(1); + + final CountDownLatch evtLatch1 = new CountDownLatch(1); + + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + log.info("Rebalance event: " + evt); + + evtLatch1.countDown(); + + return true; + } + }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED); + + assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + + client = false; + + startGrid(2); + + assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS)); + assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionsExchange() throws Exception { + partitionsExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionsExchangeFairAffinity() throws Exception { + fairAffinity = true; + + partitionsExchange(); + } + + /** + * @throws Exception If failed. + */ + private void partitionsExchange() throws Exception { + Ignite ignite0 = startGrid(0); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + + client = true; + + log.info("Start client node1."); + + Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + log.info("Start client node2."); + + Ignite ignite3 = startGrid(3); + + waitForTopologyUpdate(4, 4); + + TestCommunicationSpi spi3 = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + spi3.reset(); + + log.info("Start one more server node."); + + client = false; + + Ignite ignite4 = startGrid(4); + + waitForTopologyUpdate(5, 5); + + TestCommunicationSpi spi4 = (TestCommunicationSpi)ignite4.configuration().getCommunicationSpi(); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(4, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + assertEquals(1, spi4.partitionsSingleMessages()); + assertEquals(0, spi4.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + spi3.reset(); + + log.info("Stop server node."); + + ignite4.close(); + + waitForTopologyUpdate(4, 6); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(3, spi0.partitionsFullMessages()); + + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + assertEquals(1, spi3.partitionsSingleMessages()); + assertEquals(0, spi3.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + log.info("Stop client node2."); + + ignite3.close(); + + waitForTopologyUpdate(3, 7); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + assertEquals(0, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + spi0.reset(); + spi1.reset(); + + log.info("Stop client node1."); + + ignite2.close(); + + waitForTopologyUpdate(2, 8); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + + log.info("Stop server node."); + + ignite1.close(); + + waitForTopologyUpdate(1, 9); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(0, spi0.partitionsFullMessages()); + } + + /** + * @param expNodes Expected number of nodes. + * @param topVer Expected topology version. + * @throws Exception If failed. + */ + private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception { + final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0); + + waitForTopologyUpdate(expNodes, ver); + } + + /** + * @param expNodes Expected number of nodes. + * @param topVer Expected topology version. + * @throws Exception If failed. + */ + private void waitForTopologyUpdate(int expNodes, final AffinityTopologyVersion topVer) throws Exception { + List<Ignite> nodes = G.allGrids(); + + assertEquals(expNodes, nodes.size()); + + for (Ignite ignite : nodes) { + final IgniteKernal kernal = (IgniteKernal)ignite; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return topVer.equals(kernal.context().cache().context().exchange().readyAffinityVersion()); + } + }, 10_000); + + assertEquals("Unexpected affinity version for " + ignite.name(), + topVer, + kernal.context().cache().context().exchange().readyAffinityVersion()); + } + + Iterator<Ignite> it = nodes.iterator(); + + Ignite ignite0 = it.next(); + + Affinity<Integer> aff0 = ignite0.affinity(null); + + while (it.hasNext()) { + Ignite ignite = it.next(); + + Affinity<Integer> aff = ignite.affinity(null); + + assertEquals(aff0.partitions(), aff.partitions()); + + for (int part = 0; part < aff.partitions(); part++) + assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), aff.mapPartitionToPrimaryAndBackups(part)); + } + + for (Ignite ignite : nodes) { + final IgniteKernal kernal = (IgniteKernal)ignite; + + for (IgniteInternalCache cache : kernal.context().cache().caches()) { + GridDhtPartitionTopology top = cache.context().topology(); + + assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']', + topVer, + top.topologyVersion()); + } + } + + awaitPartitionMapExchange(); + } + + /** + * @throws Exception If failed. + */ + public void testClientOnlyCacheStart() throws Exception { + clientOnlyCacheStart(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testNearOnlyCacheStart() throws Exception { + clientOnlyCacheStart(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testClientOnlyCacheStartFromServerNode() throws Exception { + clientOnlyCacheStart(false, true); + } + + /** + * @throws Exception If failed. + */ + public void testNearOnlyCacheStartFromServerNode() throws Exception { + clientOnlyCacheStart(true, true); + } + + /** + * @param nearCache If {@code true} creates near cache on client. + * @throws Exception If failed. + */ + private void clientOnlyCacheStart(boolean nearCache, boolean srvNode) throws Exception { + Ignite ignite0 = startGrid(0); + Ignite ignite1 = startGrid(1); + + waitForTopologyUpdate(2, 2); + + final String CACHE_NAME1 = "cache1"; + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME1); + + if (srvNode) + ccfg.setNodeFilter(new TestFilter(getTestGridName(2))); + + ignite0.createCache(ccfg); + + client = !srvNode; + + Ignite ignite2 = startGrid(2); + + waitForTopologyUpdate(3, 3); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi(); + TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + assertNull(((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1")); + + if (nearCache) + ignite2.getOrCreateNearCache(CACHE_NAME1, new NearCacheConfiguration<>()); + else + ignite2.cache(CACHE_NAME1); + + waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 1)); + + GridCacheAdapter cache = ((IgniteKernal)ignite2).context().cache().context().cache().internalCache("cache1"); + + assertNotNull(cache); + assertEquals(nearCache, cache.context().isNear()); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(1, spi0.partitionsFullMessages()); + assertEquals(0, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + + ClusterNode clientNode = ((IgniteKernal)ignite2).localNode(); + + for (Ignite ignite : Ignition.allGrids()) { + GridDiscoveryManager disco = ((IgniteKernal)ignite).context().discovery(); + + assertTrue(disco.cacheNode(clientNode, CACHE_NAME1)); + assertFalse(disco.cacheAffinityNode(clientNode, CACHE_NAME1)); + assertEquals(nearCache, disco.cacheNearNode(clientNode, CACHE_NAME1)); + } + + spi0.reset(); + spi1.reset(); + spi2.reset(); + + final String CACHE_NAME2 = "cache2"; + + ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME2); + + ignite2.createCache(ccfg); + + waitForTopologyUpdate(3, new AffinityTopologyVersion(3, 2)); + + assertEquals(0, spi0.partitionsSingleMessages()); + assertEquals(2, spi0.partitionsFullMessages()); + assertEquals(1, spi1.partitionsSingleMessages()); + assertEquals(0, spi1.partitionsFullMessages()); + assertEquals(1, spi2.partitionsSingleMessages()); + assertEquals(0, spi2.partitionsFullMessages()); + } + + /** + * + */ + private static class TestFilter implements IgnitePredicate<ClusterNode> { + /** */ + private String exclNodeName; + + /** + * @param exclNodeName Node name to exclude. + */ + public TestFilter(String exclNodeName) { + this.exclNodeName = exclNodeName; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return !exclNodeName.equals(clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME)); + } + } + + /** + * Test communication SPI. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private AtomicInteger partSingleMsgs = new AtomicInteger(); + + /** */ + private AtomicInteger partFullMsgs = new AtomicInteger(); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) { + super.sendMessage(node, msg); + + Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridDhtPartitionsSingleMessage) { + if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partSingleMsgs.incrementAndGet(); + } + } + else if (msg0 instanceof GridDhtPartitionsFullMessage) { + if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) { + log.info("Partitions message: " + msg0.getClass().getSimpleName()); + + partFullMsgs.incrementAndGet(); + } + } + } + + /** + * + */ + void reset() { + partSingleMsgs.set(0); + partFullMsgs.set(0); + } + + /** + * @return Sent partitions single messages. + */ + int partitionsSingleMessages() { + return partSingleMsgs.get(); + } + + /** + * @return Sent partitions full messages. + */ + int partitionsFullMessages() { + return partFullMsgs.get(); + } + } + +}
