Repository: ignite Updated Branches: refs/heads/ignite-5075 c5ef936c5 -> dc8e10259
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc8e1025 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc8e1025 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc8e1025 Branch: refs/heads/ignite-5075 Commit: dc8e102599f966a39f7bdd7bc22274db52512daf Parents: c5ef936 Author: sboikov <[email protected]> Authored: Wed May 17 15:14:53 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed May 17 15:14:53 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 64 +- .../cache/database/tree/BPlusTree.java | 39 +- .../apache/ignite/internal/util/IgniteTree.java | 11 + .../processors/cache/IgniteCacheGroupsTest.java | 696 ++++++++++++++++++- 4 files changed, 762 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 9553491..b922a13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -209,10 +209,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (pendingEntries != null) { PendingRow row = new PendingRow(cacheId); - boolean removex = pendingEntries.removex(row); + GridCursor<PendingRow> cursor = pendingEntries.find(row, row, PendingEntriesTree.WITHOUT_KEY); - while (removex) - removex = pendingEntries.removex(row); + while (cursor.next()) { + boolean res = pendingEntries.removex(cursor.get()); + + assert res; + } } } } @@ -848,7 +851,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert row.key != null && row.link != 0 && row.expireTime != 0 : row; - if (pendingEntries.remove(row) != null) { + if (pendingEntries.removex(row)) { if (obsoleteVer == null) obsoleteVer = ctx.versions().next(); @@ -1352,17 +1355,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager Exception ex = null; - SearchRow row = new SearchRow(cacheId); + SearchRow bound = new SearchRow(cacheId); + + GridCursor<? extends CacheDataRow> cursor = dataTree.find(bound, bound, CacheDataRowAdapter.RowData.KEY_ONLY); + + while (cursor.next()) { + CacheDataRow row = cursor.get(); - CacheDataRow removed = dataTree.remove(row); + assert row.link() != 0 : row; - while (removed != null) { try { - rowStore.removeRow(removed.link()); + boolean res = dataTree.removex(row); - decrementSize(cacheId); + assert res : row; + + rowStore.removeRow(row.link()); - removed = dataTree.remove(row); + decrementSize(cacheId); } catch (IgniteCheckedException e) { U.error(log, "Fail remove row [link=" + row.link() + "]"); @@ -2180,22 +2189,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** * @param grp Cache group. - * @param cacheId Cache ID. - * @param expireTime Expire time. - * @param link Link. * @return Row. * @throws IgniteCheckedException If failed. */ - static PendingRow createRowWithKey(CacheGroupInfrastructure grp, int cacheId, long expireTime, long link) - throws IgniteCheckedException { - PendingRow row = new PendingRow(cacheId, expireTime, link); - + PendingRow initKey(CacheGroupInfrastructure grp) throws IgniteCheckedException { CacheDataRowAdapter rowData = new CacheDataRowAdapter(link); rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY); - row.key = rowData.key(); + key = rowData.key(); - return row; + return this; } /** {@inheritDoc} */ @@ -2209,6 +2212,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager */ protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> { /** */ + private final static Object WITHOUT_KEY = new Object(); + + /** */ private final CacheGroupInfrastructure grp; /** @@ -2284,9 +2290,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore) + @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object flag) throws IgniteCheckedException { - return io.getLookupRow(this, pageAddr, idx); + PendingRow row = io.getLookupRow(this, pageAddr, idx); + + return flag == WITHOUT_KEY ? row : row.initKey(grp); } } @@ -2369,10 +2377,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp, - getCacheId(pageAddr, idx), - getExpireTime(pageAddr, idx), - getLink(pageAddr, idx)); + return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx), getLink(pageAddr, idx)); } /** {@inheritDoc} */ @@ -2443,10 +2448,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp, - getCacheId(pageAddr, idx), - getExpireTime(pageAddr, idx), - getLink(pageAddr, idx)); + return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx), getLink(pageAddr, idx)); } /** {@inheritDoc} */ @@ -2532,7 +2534,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param ver Page format version. */ CacheIdAwarePendingEntryInnerIO(int ver) { - super(T_PENDING_REF_INNER, ver, true, 20); + super(T_CACHE_ID_AWARE_PENDING_REF_INNER, ver, true, 20); } /** {@inheritDoc} */ @@ -2559,7 +2561,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager * @param ver Page format version. */ CacheIdAwarePendingEntryLeafIO(int ver) { - super(T_PENDING_REF_LEAF, ver, 20); + super(T_CACHE_ID_AWARE_PENDING_REF_LEAF, ver, 20); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index a4c09d5..d7a4f7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -883,11 +883,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param upper Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. * @return Cursor. * @throws IgniteCheckedException If failed. */ - private GridCursor<T> findLowerUnbounded(L upper) throws IgniteCheckedException { - ForwardCursor cursor = new ForwardCursor(null, upper); + private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException { + ForwardCursor cursor = new ForwardCursor(null, upper, x); long firstPageId; @@ -932,14 +933,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Cursor. * @throws IgniteCheckedException If failed. */ - @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException { + @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException { + return find(lower, upper, null); + } + + /** + * @param lower Lower bound inclusive or {@code null} if unbounded. + * @param upper Upper bound inclusive or {@code null} if unbounded. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + @Override public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException { checkDestroyed(); try { if (lower == null) - return findLowerUnbounded(upper); + return findLowerUnbounded(upper, x); - ForwardCursor cursor = new ForwardCursor(lower, upper); + ForwardCursor cursor = new ForwardCursor(lower, upper, x); cursor.find(); @@ -4376,6 +4388,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ private final L upperBound; + /** */ + private final Object x; + /** * @param lowerBound Lower bound. * @param upperBound Upper bound. @@ -4383,6 +4398,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements ForwardCursor(L lowerBound, L upperBound) { this.lowerBound = lowerBound; this.upperBound = upperBound; + this.x = null; + } + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + */ + ForwardCursor(L lowerBound, L upperBound, Object x) { + this.lowerBound = lowerBound; + this.upperBound = upperBound; + this.x = x; } /** @@ -4495,7 +4522,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements rows = (T[])new Object[cnt]; for (int i = 0; i < cnt; i++) { - T r = getRow(io, pageAddr, startIdx + i); + T r = getRow(io, pageAddr, startIdx + i, x); rows = GridArrays.set(rows, i, r); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java index 396b8a4..3198191 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java @@ -64,6 +64,17 @@ public interface IgniteTree<L, T> { public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException; /** + * Returns a cursor from lower to upper bounds inclusive. + * + * @param lower Lower bound or {@code null} if unbounded. + * @param upper Upper bound or {@code null} if unbounded. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + public GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException; + + /** * Returns a value mapped to the lowest key, or {@code null} if tree is empty * @return Value. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/dc8e1025/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index aa1fcff..a2eecf2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -18,17 +18,20 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import java.util.concurrent.locks.Lock; import javax.cache.CacheException; import org.apache.ignite.Ignite; @@ -36,13 +39,20 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheExistsException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -54,6 +64,7 @@ import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -77,6 +88,12 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { private static final String GROUP2 = "grp2"; /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ private boolean client; /** */ @@ -159,6 +176,663 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testCreateCacheWithSameNameInAnotherGroup() throws Exception { + startGridsMultiThreaded(2); + + final Ignite ignite = ignite(0); + + ignite.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false)); + + GridTestUtils.assertThrows(null, new GridPlainCallable<Void>() { + @Override public Void call() throws Exception { + ignite(1).createCache(cacheConfiguration(GROUP2, CACHE1, PARTITIONED, ATOMIC, 2, false)); + return null; + } + }, CacheExistsException.class, "a cache with the same name is already started"); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDestroyCachesAtomicPartitioned() throws Exception { + createDestroyCaches(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDestroyCachesTxPartitioned() throws Exception { + createDestroyCaches(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDestroyCachesAtomicReplicated() throws Exception { + createDestroyCaches(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testCreateDestroyCachesTxReplicated() throws Exception { + createDestroyCaches(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryAtomicPartitioned() throws Exception { + scanQuery(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryTxPartitioned() throws Exception { + scanQuery(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryAtomicReplicated() throws Exception { + scanQuery(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryTxReplicated() throws Exception { + scanQuery(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryAtomicLocal() throws Exception { + scanQuery(LOCAL, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryTxLocal() throws Exception { + scanQuery(LOCAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlAtomicPartitioned() throws Exception { + entriesTtl(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlTxPartitioned() throws Exception { + entriesTtl(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlAtomicReplicated() throws Exception { + entriesTtl(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlTxReplicated() throws Exception { + entriesTtl(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlAtomicLocal() throws Exception { + entriesTtl(LOCAL, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testEntriesTtlTxLocal() throws Exception { + entriesTtl(LOCAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorAtomicPartitioned() throws Exception { + cacheIterator(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorTxPartitioned() throws Exception { + cacheIterator(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorAtomicReplicated() throws Exception { + cacheIterator(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorTxReplicated() throws Exception { + cacheIterator(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorAtomicLocal() throws Exception { + cacheIterator(LOCAL, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorTxLocal() throws Exception { + cacheIterator(LOCAL, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryMultiplePartitionsAtomicPartitioned() throws Exception { + scanQueryMultiplePartitions(PARTITIONED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryMultiplePartitionsTxPartitioned() throws Exception { + scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryMultiplePartitionsAtomicReplicated() throws Exception { + scanQueryMultiplePartitions(REPLICATED, ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testScanQueryMultiplePartitionsTxReplicated() throws Exception { + scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + int keys = 10000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + boolean local = cacheMode == LOCAL; + + if (local) + startGrid(0); + else + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); + + if(!local) + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache1; + IgniteCache<Integer, Integer> cache2; + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(local ? 0 : 1); + + try (Transaction tx = ignite.transactions().txStart()) { + cache1 = ignite.cache(CACHE1); + cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + cache1 = ignite(local ? 0 : 1).cache(CACHE1); + cache2 = ignite(local ? 0 : 2).cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + } + + ScanQuery<Integer, Integer> qry = new ScanQuery<>(); + + Set<Integer> keysSet = sequence(keys); + + for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE1).query(qry)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data1[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + + srv0.destroyCache(CACHE1); + + keysSet = sequence(keys); + + for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).cache(CACHE2).query(qry)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data2[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + } + + /** + * @throws Exception If failed. + */ + private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + int keys = 10000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + srv0.createCache( + cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false) + .setAffinity(new RendezvousAffinityFunction().setPartitions(32))); + srv0.createCache( + cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false) + .setAffinity(new RendezvousAffinityFunction().setPartitions(32))); + + awaitPartitionMapExchange(); + + IgniteCache<Integer, Integer> cache1; + IgniteCache<Integer, Integer> cache2; + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(1); + + try (Transaction tx = ignite.transactions().txStart()) { + cache1 = ignite.cache(CACHE1); + cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + cache1 = ignite(1).cache(CACHE1); + cache2 = ignite(2).cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + } + + + int p = ThreadLocalRandom.current().nextInt(32); + + ScanQuery<Integer, Integer> qry = new ScanQuery().setPartition(p); + + Set<Integer> keysSet = new TreeSet<>(); + + cache1 = ignite(3).cache(CACHE1); + + Affinity<Integer> aff = affinity(cache1); + + for(int i = 0; i < keys; i++) { + if (aff.partition(i) == p) { + keysSet.add(i); + } + } + + for (Cache.Entry<Integer, Integer> entry : cache1.query(qry)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data1[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + + srv0.destroyCache(CACHE1); + + keysSet = new TreeSet<>(); + + cache2 = ignite(3).cache(CACHE2); + + aff = affinity(cache2); + + for(int i = 0; i < keys; i++) { + if (aff.partition(i) == p) { + keysSet.add(i); + } + } + + for (Cache.Entry<Integer, Integer> entry : cache2.query(qry)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data2[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + } + + /** + * @throws Exception If failed. + */ + private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + int keys = 10000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + boolean local = cacheMode == LOCAL; + + if (local) + startGrid(0); + else + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); + + if(!local) + awaitPartitionMapExchange(); + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(local ? 0 : 1); + + try (Transaction tx = ignite.transactions().txStart()) { + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1); + IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + } + + + Set<Integer> keysSet = sequence(keys); + + for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).<Integer, Integer>cache(CACHE1)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data1[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + + srv0.destroyCache(CACHE1); + + keysSet = sequence(keys); + + for (Cache.Entry<Integer, Integer> entry : ignite(local ? 0 : 3).<Integer, Integer>cache(CACHE2)) { + assertTrue(keysSet.remove(entry.getKey())); + assertEquals(data2[entry.getKey()], entry.getValue()); + } + + assertTrue(keysSet.isEmpty()); + } + + /** + * @throws Exception If failed. + */ + private void entriesTtl(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + int keys = 10000; + + final int ttl = 10000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + boolean local = cacheMode == LOCAL; + + if (local) + startGrid(0); + else + startGridsMultiThreaded(4); + + + Ignite srv0 = ignite(0); + + srv0.createCache( + cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false) + // -1 = ETERNAL just created entries are not expiring + // -2 = NOT_CHANGED not to change ttl on entry update + .setExpiryPolicyFactory(new PlatformExpiryPolicyFactory(-1, -2, ttl)).setEagerTtl(true) + ); + + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); + + if (!local) + awaitPartitionMapExchange(); + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(local ? 0 : 1); + + try (Transaction tx = ignite.transactions().txStart()) { + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + IgniteCache cache1 = ignite(local ? 0 : 1).cache(CACHE1); + IgniteCache cache2 = ignite(local ? 0 : 2).cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + } + + checkData(local ? 0 : 3, CACHE1, data1); + checkData(local ? 0 : 3, CACHE2, data2); + + srv0.destroyCache(CACHE2); + + checkData(local ? 0 : 3, CACHE1, data1); + + // Wait for expiration + + Thread.sleep((long)(ttl * 1.2)); + + assertEquals(0, ignite(local ? 0 : 3).cache(CACHE1).size()); + } + + /** + * @throws Exception If failed. + */ + private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception { + int keys = 10000; + + Integer[] data1 = generateData(keys); + Integer[] data2 = generateData(keys); + + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)); + + awaitPartitionMapExchange(); + + if (atomicityMode == TRANSACTIONAL) { + Ignite ignite = ignite(1); + + try (Transaction tx = ignite.transactions().txStart()) { + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + + tx.commit(); + } + } + else { + IgniteCache cache1 = ignite(1).cache(CACHE1); + IgniteCache cache2 = ignite(2).cache(CACHE2); + + for (int i = 0; i < keys ; i++) { + cache1.put(i, data1[i]); + cache2.put(i, data2[i]); + } + } + + checkLocalData(3, CACHE1, data1); + checkLocalData(0, CACHE2, data2); + + checkData(0, CACHE1, data1); + checkData(3, CACHE2, data2); + + ignite(1).destroyCache(CACHE2); + + startGrid(5); + + awaitPartitionMapExchange(); + + checkData(5, CACHE1, data1); + checkLocalData(5, CACHE1, data1); + + ignite(1).destroyCache(CACHE1); + + checkCacheGroup(5, GROUP1, false); + } + + /** + * Creates an array of random integers. + * + * @param cnt Array length. + * @return Array of random integers. + */ + private Integer[] generateData(int cnt) { + Random rnd = ThreadLocalRandom.current(); + + Integer[] data = new Integer[cnt]; + + for (int i = 0; i < data.length; i++) + data[i] = rnd.nextInt(); + + return data; + } + + /** + * @param cnt Sequence length. + * @return Sequence of integers. + */ + private Set<Integer> sequence(int cnt) { + Set<Integer> res = new TreeSet<>(); + + for (int i = 0; i < cnt; i++) + res.add(i); + + return res; + } + + /** + * @param idx Node index. + * @param cacheName Cache name. + * @param data Expected data. + * @throws Exception If failed. + */ + private void checkData(int idx, String cacheName, Integer[] data) throws Exception { + Set<Integer> keys = sequence(data.length); + + Set<Map.Entry<Integer, Integer>> entries = + ignite(idx).<Integer, Integer>cache(cacheName).getAll(keys).entrySet(); + + for (Map.Entry<Integer, Integer> entry : entries) { + assertTrue(keys.remove(entry.getKey())); + assertEquals(data[entry.getKey()], entry.getValue()); + } + + assertTrue(keys.isEmpty()); + } + + /** + * @param idx Node index. + * @param cacheName Cache name. + * @param data Expected data. + * @throws Exception If failed. + */ + private void checkLocalData(int idx, String cacheName, Integer[] data) throws Exception { + Ignite ignite = ignite(idx); + ClusterNode node = ignite.cluster().localNode(); + IgniteCache cache = ignite.<Integer, Integer>cache(cacheName); + + Affinity aff = affinity(cache); + + Set<Integer> localKeys = new TreeSet<>(); + + for (int key = 0; key < data.length; key++) { + if(aff.isPrimaryOrBackup(node, key)) + localKeys.add(key); + } + + Iterable<Cache.Entry<Integer, Integer>> localEntries = cache.localEntries(CachePeekMode.OFFHEAP); + + for (Cache.Entry<Integer, Integer> entry : localEntries) { + assertTrue(localKeys.remove(entry.getKey())); + assertEquals(data[entry.getKey()], entry.getValue()); + } + + assertTrue(localKeys.isEmpty()); + } + + /** * @param srvs Number of server nodes. * @throws Exception If failed. */ @@ -175,37 +849,37 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { for (int iter = 0; iter < 3; iter++) { log.info("Iteration: " + iter); - srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false)); checkCacheDiscoveryDataConsistent(); for (int i = 0; i < srvs; i++) { checkCacheGroup(i, GROUP1, true); - checkCache(i, "cache1"); + checkCache(i, CACHE1); } - srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false)); checkCacheDiscoveryDataConsistent(); for (int i = 0; i < srvs; i++) { checkCacheGroup(i, GROUP1, true); - checkCache(i, "cache2"); + checkCache(i, CACHE2); } - srv0.destroyCache("cache1"); + srv0.destroyCache(CACHE1); checkCacheDiscoveryDataConsistent(); for (int i = 0; i < srvs; i++) { checkCacheGroup(i, GROUP1, true); - checkCache(i, "cache2"); + checkCache(i, CACHE2); } - srv0.destroyCache("cache2"); + srv0.destroyCache(CACHE2); checkCacheDiscoveryDataConsistent(); @@ -328,9 +1002,9 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); IgniteCache<Object, Object> srv0Cache1 = - srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false)); IgniteCache<Object, Object> srv0Cache2 = - srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2, false)); + srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false)); for (int i = 0; i < 10; i++) srv0Cache1.put(new Key1(i), i);
