clients start test
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eaf62e08 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eaf62e08 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eaf62e08 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: eaf62e0808db08077b48327e4a723c41c289ae3a Parents: 58b6e05 Author: sboikov <[email protected]> Authored: Wed May 3 10:20:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 3 10:20:49 2017 +0300 ---------------------------------------------------------------------- .../CacheClientsConcurrentStartTest.java | 248 +++++++++++++++++++ 1 file changed, 248 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf62e08/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java new file mode 100644 index 0000000..44425f1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRV_CNT = 4; + + /** */ + private static final int CLIENTS_CNT = 16; + + /** */ + private static final int CACHES = 30; + + /** Stopped. */ + private volatile boolean stopped; + + /** Iteration. */ + private static final int ITERATIONS = 2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi testSpi = new TcpDiscoverySpi() { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryCustomEventMessage && msg.verified()) { + try { + System.out.println(Thread.currentThread().getName() + " delay custom message"); + + U.sleep(ThreadLocalRandom.current().nextLong(500) + 100); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + super.writeToSocket(sock, out, msg, timeout); + } + }; + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setMarshaller(null); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + if (getTestGridIndex(gridName) >= SRV_CNT) + cfg.setClientMode(true); + else { + CacheConfiguration ccfgs[] = new CacheConfiguration[CACHES / 2]; + + for (int i = 0; i < ccfgs.length; i++) + ccfgs[i] = cacheConfiguration("cache-" + i); + + cfg.setCacheConfiguration(ccfgs); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + for (int i = 0; i < ITERATIONS; i++) { + try { + log.info("Iteration: " + (i + 1) + '/' + ITERATIONS); + + doTest(); + } + finally { + stopAllGrids(true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + final AtomicBoolean failed = new AtomicBoolean(); + + startGrids(SRV_CNT); + + for (int i = 0; i < SRV_CNT; i++) { + ((TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi()).blockMessages(new IgnitePredicate<GridIoMessage>() { + @Override public boolean apply(GridIoMessage msg) { + if (msg.message() instanceof GridDhtPartitionsFullMessage) { + try { + U.sleep(ThreadLocalRandom.current().nextLong(500) + 100); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + return false; + } + }); + } + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < CLIENTS_CNT; i++) { + final int idx = i; + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + Random rnd = new Random(); + + try { + Ignite ignite = startGrid(SRV_CNT + idx); + + assertTrue(ignite.configuration().isClientMode()); + + for (int i = 0; i < CACHES / 2; i++) { + String cacheName = "cache-" + rnd.nextInt(CACHES); + + IgniteCache<Object, Object> cache = getCache(ignite, cacheName); + + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + IgniteAtomicSequence seq = ignite.atomicSequence("seq-" + rnd.nextInt(20), 0, true); + + seq.getAndIncrement(); + } + + while (!stopped) { + IgniteCache<Object, Object> cache = getCache(ignite, "cache-" + rnd.nextInt(CACHES)); + + int val = Math.abs(rnd.nextInt(100)); + + if (val >= 0 && val < 40) + cache.containsKey(ignite.cluster().localNode().id()); + else if (val >= 40 && val < 80) + cache.get(ignite.cluster().localNode().id()); + else + cache.put(ignite.cluster().localNode().id(), UUID.randomUUID()); + + Thread.sleep(10); + } + } + catch (Exception e) { + log.error("Unexpected error: " + e, e); + + failed.set(true); + } + } + }, 1, "client-thread"); + + futs.add(fut); + } + + Thread.sleep(10_000); + + stopped = true; + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); + + assertFalse(failed.get()); + } + + /** + * @param grid Grid. + * @return Cache. + */ + private IgniteCache getCache(Ignite grid, String cacheName) { + return grid.getOrCreateCache(cacheConfiguration(cacheName)); + } + + private CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(cacheName); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(CacheMode.PARTITIONED); + ccfg.setBackups(2); + ccfg.setNearConfiguration(null); + ccfg.setAtomicityMode(TRANSACTIONAL); + + return ccfg; + } +} \ No newline at end of file
