Repository: ignite Updated Branches: refs/heads/ignite-1607 838e0ef87 -> 4da8ac1fa
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11e6449a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11e6449a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11e6449a Branch: refs/heads/ignite-1607 Commit: 11e6449a546d9c1b9e2ecebb4fa1d5b3e90ab39a Parents: 838e0ef Author: sboikov <[email protected]> Authored: Tue Oct 20 13:11:59 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 20 13:34:38 2015 +0300 ---------------------------------------------------------------------- .../cache/CacheNearReaderUpdateTest.java | 388 +++++++++++++++++++ 1 file changed, 388 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/11e6449a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java new file mode 100644 index 0000000..aed2db8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheNearReaderUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testGetUpdateMultithreaded() throws Exception { + List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>(); + + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); + + { + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + GridTestUtils.setMemoryMode(null, ccfg, GridTestUtils.TestMemoryMode.OFFHEAP_TIERED, 0, 0); + + cfgs.add(ccfg); + } + + final List<Ignite> putNodes = new ArrayList<>(); + + for (int i = 0; i < SRVS + CLIENTS - 1; i++) + putNodes.add(ignite(i)); + + final List<Ignite> getNodes = new ArrayList<>(); + + getNodes.add(ignite(SRVS + CLIENTS - 1)); + getNodes.add(ignite(0)); + + for (CacheConfiguration<Integer, Integer> ccfg : cfgs) { + logCacheInfo(ccfg); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, null, null); + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + getUpdateMultithreaded(ccfg, putNodes, getNodes, PESSIMISTIC, REPEATABLE_READ); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, REPEATABLE_READ); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, SERIALIZABLE); + } + } + } + + /** + * @param ccfg Cache configuration. + * @param putNodes Nodes executing updates. + * @param getNodes Nodes executing gets. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void getUpdateMultithreaded(CacheConfiguration<Integer, Integer> ccfg, + final List<Ignite> putNodes, + final List<Ignite> getNodes, + final TransactionConcurrency concurrency, + final TransactionIsolation isolation) throws Exception { + log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + final Ignite ignite0 = ignite(0); + + final String cacheName = ignite0.createCache(ccfg).getName(); + + try { + for (int i = 0; i < 5; i++) { + final Integer key = i; + + final AtomicInteger putThreadIdx = new AtomicInteger(); + final AtomicInteger getThreadIdx = new AtomicInteger(); + + final int PUT_THREADS = 20; + final int GET_THREAD = 20; + + final CyclicBarrier barrier = new CyclicBarrier(PUT_THREADS + GET_THREAD); + + final IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = putThreadIdx.getAndIncrement() % putNodes.size(); + + Ignite ignite = putNodes.get(idx); + + IgniteCache<Integer, Integer> cache = ignite.cache(cacheName); + + IgniteTransactions txs = ignite.transactions(); + + Thread.currentThread().setName("update-thread-" + ignite.name()); + + barrier.await(); + + for (int i = 0; i < 100; i++) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + if (concurrency != null) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cache.put(key, rnd.nextInt()); + + tx.commit(); + } + catch (TransactionOptimisticException ignore) { + assertEquals(concurrency, OPTIMISTIC); + assertEquals(isolation, SERIALIZABLE); + } + } + else + cache.put(key, rnd.nextInt()); + } + + return null; + } + }, PUT_THREADS, "update-thread"); + + IgniteInternalFuture<?> getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = getThreadIdx.getAndIncrement() % getNodes.size(); + + Ignite ignite = getNodes.get(idx); + + IgniteCache<Integer, Integer> cache; + + if (ignite.configuration().isClientMode()) + cache = ignite.createNearCache(cacheName, new NearCacheConfiguration<Integer, Integer>()); + else + cache = ignite.cache(cacheName); + + Thread.currentThread().setName("get-thread-" + ignite.name()); + + barrier.await(); + + while (!updateFut.isDone()) + cache.get(key); + + return null; + } + }, GET_THREAD, "get-thread"); + + updateFut.get(); + getFut.get(); + + Integer val = (Integer)ignite0.cache(cacheName).get(key); + + log.info("Iteration [iter=" + i + ", val=" + val + ']'); + + for (Ignite getNode : getNodes) { + IgniteCache<Integer, Integer> cache = getNode.cache(cacheName); + + if (getNode.configuration().isClientMode() || + cache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null) + assertNotNull(getNode.cache(cacheName).localPeek(key)); + } + + checkValue(key, val, cacheName); + + for (int n = 0; n < SRVS + CLIENTS; n++) { + val = n; + + ignite(n).cache(cacheName).put(key, val); + + checkValue(key, val, cacheName); + } + } + } + finally { + destroyCache(ignite0, cacheName); + } + } + + /** + * @param key Key. + * @param expVal Expected value. + * @param cacheName Cache name. + */ + private void checkValue(Object key, Object expVal, String cacheName) { + for (int i = 0; i < SRVS + CLIENTS; i++) { + IgniteCache<Object, Object> cache = ignite(i).cache(cacheName); + + assertEquals(expVal, cache.get(key)); + } + } + + /** + * @param ccfg Cache configuration. + */ + private void logCacheInfo(CacheConfiguration<?, ?> ccfg) { + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", sync=" + ccfg.getWriteSynchronizationMode() + + ", backups=" + ccfg.getBackups() + + ", memMode=" + ccfg.getMemoryMode() + + ", near=" + (ccfg.getNearConfiguration() != null) + + ", store=" + ccfg.isWriteThrough() + + ", evictPlc=" + (ccfg.getEvictionPolicy() != null) + + ", swap=" + ccfg.isSwapEnabled() + + ", maxOffheap=" + ccfg.getOffHeapMaxMemory() + + ']'); + } + + /** + * @param ignite Node. + * @param cacheName Cache name. + */ + private void destroyCache(Ignite ignite, String cacheName) { + storeMap.clear(); + + ignite.destroyCache(cacheName); + + for (Ignite ignite0 : G.allGrids()) { + GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi(); + + spi.clearAll(); + } + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param storeEnabled If {@code true} adds cache store. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean storeEnabled, + boolean nearCache) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + if (storeEnabled) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteThrough(true); + ccfg.setReadThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheStore<Integer, Integer> create() { + return new CacheStoreAdapter<Integer, Integer>() { + @Override public Integer load(Integer key) throws CacheLoaderException { + return storeMap.get(key); + } + + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + @Override public void delete(Object key) { + storeMap.remove(key); + } + }; + } + } +}
