Do not re-create node2part map on every singleMap message
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46cba2a4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46cba2a4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46cba2a4 Branch: refs/heads/ignite-5398 Commit: 46cba2a46966759e6d658f3ba991f15722a6634f Parents: 250b4e0 Author: Alexey Goncharuk <[email protected]> Authored: Wed May 17 19:04:10 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed May 17 19:04:39 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 9 + .../GridCachePartitionExchangeManager.java | 8 +- .../dht/GridDhtPartitionTopologyImpl.java | 16 +- .../colocated/GridDhtColocatedLockFuture.java | 5 + .../GridDhtPartitionsExchangeFuture.java | 3 + .../processors/cluster/ClusterProcessor.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 18 +- .../communication/tcp/TcpCommunicationSpi.java | 10 +- .../resources/META-INF/classnames.properties | 30 ++- .../CacheClientsConcurrentStartTest.java | 248 +++++++++++++++++++ 10 files changed, 338 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 277d176..4557561 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -319,6 +319,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { else U.error(log, msg0.toString()); + try { + cacheMsg.onClassError(new IgniteCheckedException("Failed to find message handler for message: " + cacheMsg)); + + processFailedMessage(nodeId, cacheMsg, c); + } + catch (Exception e) { + U.error(log, "Failed to process failed message: " + e, e); + } + return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 02da4fd..79166f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2000,8 +2000,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut) { GridDhtPartitionsExchangeFuture cur = super.addx(fut); - while (size() > EXCHANGE_HISTORY_SIZE) + while (size() > EXCHANGE_HISTORY_SIZE) { + GridDhtPartitionsExchangeFuture last = last(); + + if (last != null && !last.isDone()) + break; + removeLast(); + } // Return the value in the set. return cur == null ? fut : cur; http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index d1283c3..d98358a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -90,6 +90,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** Logger. */ private final IgniteLogger log; + /** Time logger. */ + private final IgniteLogger timeLog; + /** */ private final AtomicReferenceArray<GridDhtLocalPartition> locParts; @@ -146,6 +149,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh this.entryFactory = entryFactory; log = cctx.logger(getClass()); + timeLog = cctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); } @@ -1262,9 +1266,19 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh return null; } + long now = U.currentTimeMillis(); + lock.writeLock().lock(); try { + long acquired = U.currentTimeMillis(); + + if (acquired - now >= 100) { + if (timeLog.isInfoEnabled()) + timeLog.info("Waited too long to acquire topology write lock " + + "[cache=" + cctx.cacheId() + ", waitTime=" + (acquired - now) + ']'); + } + if (stopping) return null; @@ -1316,7 +1330,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh long updateSeq = this.updateSeq.incrementAndGet(); - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); + node2part.updateSequence(updateSeq); boolean changed = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 00bcd10..9a0f090 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1444,6 +1444,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** */ private boolean rcvRes; + /** Remap topology version for debug purpose. */ + private AffinityTopologyVersion remapTopVer; + /** * @param node Node. * @param keys Keys. @@ -1515,6 +1518,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture return; rcvRes = true; + + remapTopVer = res.clientRemapVersion(); } if (res.error() != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4e04156..fff1702 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -113,6 +113,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); + /** */ public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time"; /** */ @@ -551,6 +552,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchLog.info("Start exchange init [topVer=" + topVer + ", crd=" + crdNode + ", evt=" + discoEvt.type() + + ", node=" + discoEvt.node() + + ", evtNode=" + discoEvt.node() + ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 317b274..28d8cc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -187,7 +187,7 @@ public class ClusterProcessor extends GridProcessorAdapter { try { ctx.io().send(node, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL); } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException ignore) { if (diagnosticLog.isDebugEnabled()) { diagnosticLog.debug("Failed to send diagnostic response, node left " + "[node=" + nodeId + ", msg=" + msg + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index cbba5da..de3f363 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -2152,7 +2152,14 @@ public class GridNioServer<T> { // This exception will be handled in bodyInternal() method. throw e; } - catch (Exception e) { + catch (Exception | Error e) { // TODO IGNITE-2659. + try { + U.sleep(1000); + } + catch (IgniteInterruptedCheckedException ignore) { + // No-op. + } + U.warn(log, "Failed to process selector key (will close): " + ses, e); close(ses, new GridNioException(e)); @@ -2197,7 +2204,14 @@ public class GridNioServer<T> { // This exception will be handled in bodyInternal() method. throw e; } - catch (Exception e) { + catch (Exception | Error e) { // TODO IGNITE-2659. + try { + U.sleep(1000); + } + catch (IgniteInterruptedCheckedException ignore) { + // No-op. + } + if (!closed) U.warn(log, "Failed to process selector key (will close): " + ses, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index a8a5bd3..9a122d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3024,7 +3024,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "in order to prevent parties from waiting forever in case of network issues " + "[nodeId=" + node.id() + ", addrs=" + addrs + ']'); - errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e)); + errs.addSuppressed(new IgniteCheckedException("Failed to connect to address " + + "[addr=" + addr + ", err=" + e.getMessage() + ']', e)); // Reconnect for the second time, if connection is not established. if (!failureDetThrReached && connectAttempts < 2 && @@ -3054,11 +3055,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class, IgniteSpiOperationTimeoutException.class)) { - LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + + + U.error(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + "cluster [" + - "rmtNode=" + node + - ", err=" + errs + - ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); + "rmtNode=" + node + "]", errs); getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + "rmtNode=" + node + http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index a9923f6..b548098 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1971,4 +1971,32 @@ org.apache.ignite.transactions.TransactionRollbackException org.apache.ignite.transactions.TransactionState org.apache.ignite.transactions.TransactionTimeoutException org.apache.ignite.util.AttributeNodeFilter -org.apache.ignite.internal.util.GridPartitionStateMap \ No newline at end of file +org.apache.ignite.internal.util.GridPartitionStateMap +org.gridgain.grid.cache.db.GridCacheOffheapManager$RebalanceIteratorAdapter +org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$FileArchiver$1 +org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$Mode +org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$RecordsIterator +org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask +org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask$CollectDependantSnapshotSetJob +org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask +org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask$CollectSnapshotInfoJob +org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTaskResult +org.gridgain.grid.internal.processors.cache.database.FullPageIdIterableComparator +org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask +org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask$GetOngoingOperationJob +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$13 +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$15 +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$18 +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$CheckpointEntryType +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$Checkpointer$2 +org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$FullPageIdComparator +org.gridgain.grid.internal.processors.cache.database.SnapshotOperationFuture$1 +org.gridgain.grid.internal.processors.cache.database.SnapshotTaskBase +org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotFinishedMessage +org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotMetadataMessage +org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationFinishedMessage +org.gridgain.grid.internal.processors.cache.database.messages.SnapshotIssueMessage +org.gridgain.grid.internal.processors.cache.database.messages.SnapshotOperationFinishedMessage +org.gridgain.grid.internal.processors.cache.database.messages.SnapshotProgressMessage +org.gridgain.grid.internal.visor.database.VisorCheckpointMetrics +org.gridgain.grid.internal.visor.database.VisorMemoryPoolMetrics http://git-wip-us.apache.org/repos/asf/ignite/blob/46cba2a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java new file mode 100644 index 0000000..44425f1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRV_CNT = 4; + + /** */ + private static final int CLIENTS_CNT = 16; + + /** */ + private static final int CACHES = 30; + + /** Stopped. */ + private volatile boolean stopped; + + /** Iteration. */ + private static final int ITERATIONS = 2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi testSpi = new TcpDiscoverySpi() { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryCustomEventMessage && msg.verified()) { + try { + System.out.println(Thread.currentThread().getName() + " delay custom message"); + + U.sleep(ThreadLocalRandom.current().nextLong(500) + 100); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + super.writeToSocket(sock, out, msg, timeout); + } + }; + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setMarshaller(null); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + if (getTestGridIndex(gridName) >= SRV_CNT) + cfg.setClientMode(true); + else { + CacheConfiguration ccfgs[] = new CacheConfiguration[CACHES / 2]; + + for (int i = 0; i < ccfgs.length; i++) + ccfgs[i] = cacheConfiguration("cache-" + i); + + cfg.setCacheConfiguration(ccfgs); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + for (int i = 0; i < ITERATIONS; i++) { + try { + log.info("Iteration: " + (i + 1) + '/' + ITERATIONS); + + doTest(); + } + finally { + stopAllGrids(true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + final AtomicBoolean failed = new AtomicBoolean(); + + startGrids(SRV_CNT); + + for (int i = 0; i < SRV_CNT; i++) { + ((TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi()).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + if (msg.message() instanceof GridDhtPartitionsFullMessage) { + try { + U.sleep(ThreadLocalRandom.current().nextLong(500) + 100); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + return false; + } + }); + } + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < CLIENTS_CNT; i++) { + final int idx = i; + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + Random rnd = new Random(); + + try { + Ignite ignite = startGrid(SRV_CNT + idx); + + assertTrue(ignite.configuration().isClientMode()); + + for (int i = 0; i < CACHES / 2; i++) { + String cacheName = "cache-" + rnd.nextInt(CACHES); + + IgniteCache<Object, Object> cache = getCache(ignite, cacheName); + + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + IgniteAtomicSequence seq = ignite.atomicSequence("seq-" + rnd.nextInt(20), 0, true); + + seq.getAndIncrement(); + } + + while (!stopped) { + IgniteCache<Object, Object> cache = getCache(ignite, "cache-" + rnd.nextInt(CACHES)); + + int val = Math.abs(rnd.nextInt(100)); + + if (val >= 0 && val < 40) + cache.containsKey(ignite.cluster().localNode().id()); + else if (val >= 40 && val < 80) + cache.get(ignite.cluster().localNode().id()); + else + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + Thread.sleep(10); + } + } + catch (Exception e) { + log.error("Unexpected error: " + e, e); + + failed.set(true); + } + } + }, 1, "client-thread"); + + futs.add(fut); + } + + Thread.sleep(10_000); + + stopped = true; + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); + + assertFalse(failed.get()); + } + + /** + * @param grid Grid. + * @return Cache. + */ + private IgniteCache getCache(Ignite grid, String cacheName) { + return grid.getOrCreateCache(cacheConfiguration(cacheName)); + } + + private CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(cacheName); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(2); + ccfg.setNearConfiguration(null); + ccfg.setAtomicityMode(TRANSACTIONAL); + + return ccfg; + } +} \ No newline at end of file
