Repository: ignite Updated Branches: refs/heads/ignite-1.6 d2cbcbb80 -> 42e5232c8
IGNITE-2394: Cache loading from storage is called on client nodes. Reviewed and merged by Denis Magda ([email protected]) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42e5232c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42e5232c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42e5232c Branch: refs/heads/ignite-1.6 Commit: 42e5232c8b1e057ef159483b576abd39a9245f18 Parents: d2cbcbb Author: Alper Tekinalp <[email protected]> Authored: Tue May 17 17:19:02 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Tue May 17 17:19:02 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 6 +- .../processors/cache/IgniteCacheProxy.java | 41 ++-- .../cache/CacheClientStoreSelfTest.java | 209 ++++++++++++++++--- 3 files changed, 212 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a02db2c..9ea688d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3689,14 +3689,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) .forPredicate(new IgnitePredicate<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0; } }); - ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) .forPredicate(new IgnitePredicate<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0 && @@ -3704,7 +3704,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()) .forPredicate(new IgnitePredicate<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) >= 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index fc046af..76cc77b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -190,8 +190,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * Gets cache proxy which does not acquire read lock on gateway enter, should be - * used only if grid read lock is externally acquired. + * Gets cache proxy which does not acquire read lock on gateway enter, should be used only if grid read lock is + * externally acquired. * * @return Ignite cache proxy with simple gate. */ @@ -372,10 +372,18 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V CacheOperationContext prev = onEnter(gate, opCtx); try { - if (isAsync()) - setFuture(ctx.cache().globalLoadCacheAsync(p, args)); - else - ctx.cache().globalLoadCache(p, args); + if (isAsync()) { + if (ctx.cache().isLocal()) + setFuture(ctx.cache().localLoadCacheAsync(p, args)); + else + setFuture(ctx.cache().globalLoadCacheAsync(p, args)); + } + else { + if (ctx.cache().isLocal()) + ctx.cache().localLoadCache(p, args); + else + ctx.cache().globalLoadCache(p, args); + } } finally { onLeave(gate, prev); @@ -463,9 +471,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp) + private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) throws IgniteCheckedException { - final CacheQuery<Map.Entry<K,V>> qry; + final CacheQuery<Map.Entry<K, V>> qry; boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary(); @@ -478,8 +486,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry.projection(grp); final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx, - new IgniteOutClosureX<GridCloseableIterator<Entry<K,V>>>() { - @Override public GridCloseableIterator<Entry<K,V>> applyx() throws IgniteCheckedException { + new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>() { + @Override public GridCloseableIterator<Entry<K, V>> applyx() throws IgniteCheckedException { final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery(); return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { @@ -503,7 +511,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return new QueryCursorImpl<>(iter); } - final CacheQueryFuture<Map.Entry<K,V>> fut; + final CacheQueryFuture<Map.Entry<K, V>> fut; if (filter instanceof TextQuery) { TextQuery p = (TextQuery)filter; @@ -541,15 +549,15 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V throw new CacheException("Unsupported query type: " + filter); } - return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { + return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K, V>>() { /** */ - private Map.Entry<K,V> cur; + private Map.Entry<K, V> cur; - @Override protected Entry<K,V> onNext() throws IgniteCheckedException { + @Override protected Entry<K, V> onNext() throws IgniteCheckedException { if (!onHasNext()) throw new NoSuchElementException(); - Map.Entry<K,V> e = cur; + Map.Entry<K, V> e = cur; cur = null; @@ -848,7 +856,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } - /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { GridCacheGateway<K, V> gate = this.gate; @@ -959,7 +966,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ - @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { + @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { try { GridCacheGateway<K, V> gate = this.gate; http://git-wip-us.apache.org/repos/asf/ignite/blob/42e5232c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java index 213a8de..b1bc066 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheClientStoreSelfTest.java @@ -17,17 +17,26 @@ package org.apache.ignite.internal.processors.cache; +import javax.cache.Cache; import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -36,7 +45,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; /** - * Tests for cache client without store. + * Tests for cache client with and without store. */ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { /** */ @@ -46,26 +55,38 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "test-cache"; /** */ - private boolean client; + private volatile boolean nearEnabled; /** */ - private boolean nearEnabled; + private volatile Factory<CacheStore> factory; /** */ - private Factory<CacheStore> factory; + private volatile CacheMode cacheMode; + + /** */ + private static volatile boolean loadedFromClient; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + boolean client = gridName != null && gridName.startsWith("client"); + cfg.setClientMode(client); CacheConfiguration cc = new CacheConfiguration(); cc.setName(CACHE_NAME); cc.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cc.setCacheMode(cacheMode); + cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cc.setBackups(1); + cc.setCacheStoreFactory(factory); + if (factory instanceof Factory3) + cc.setReadThrough(true); + if (client && nearEnabled) cc.setNearConfiguration(new NearCacheConfiguration()); @@ -81,32 +102,23 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - client = false; - factory = new Factory1(); - - startGrids(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { + @Override protected void afterTest() throws Exception { stopAllGrids(); - } - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopGrid(); + loadedFromClient = false; } /** * @throws Exception If failed. */ public void testCorrectStore() throws Exception { - client = true; nearEnabled = false; + cacheMode = CacheMode.PARTITIONED; factory = new Factory1(); - Ignite ignite = startGrid(); + startGrids(2); + + Ignite ignite = startGrid("client-1"); IgniteCache cache = ignite.cache(CACHE_NAME); @@ -131,19 +143,27 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testInvalidStore() throws Exception { - client = true; nearEnabled = false; + cacheMode = CacheMode.PARTITIONED; + factory = new Factory1(); + + startGrids(2); + factory = new Factory2(); - startGrid(); + startGrid("client-1"); } /** * @throws Exception If failed. */ public void testDisabledConsistencyCheck() throws Exception { - client = false; nearEnabled = false; + cacheMode = CacheMode.PARTITIONED; + factory = new Factory1(); + + startGrids(2); + factory = new Factory2(); System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, "true"); @@ -162,6 +182,10 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { */ public void testNoStoreNearDisabled() throws Exception { nearEnabled = false; + cacheMode = CacheMode.PARTITIONED; + factory = new Factory1(); + + startGrids(2); doTestNoStore(); } @@ -171,6 +195,10 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { */ public void testNoStoreNearEnabled() throws Exception { nearEnabled = true; + cacheMode = CacheMode.PARTITIONED; + factory = new Factory1(); + + startGrids(2); doTestNoStore(); } @@ -179,10 +207,9 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void doTestNoStore() throws Exception { - client = true; factory = null; - Ignite ignite = startGrid(); + Ignite ignite = startGrid("client-1"); IgniteCache cache = ignite.cache(CACHE_NAME); @@ -204,6 +231,101 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { } /** + * Load cache created on client as LOCAL and see if it only loaded on client + * + * @throws Exception + */ + public void testLocalLoadClient() throws Exception { + cacheMode = CacheMode.LOCAL; + factory = new Factory3(); + + startGrids(2); + + Ignite client = startGrid("client-1"); + + IgniteCache cache = client.cache(CACHE_NAME); + + cache.loadCache(null); + + assertEquals(10, cache.localSize(CachePeekMode.ALL)); + + assertEquals(0, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + assertEquals(0, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + + assert loadedFromClient; + } + + /** + * Load cache from server that created on client as LOCAL and see if it only loaded on server + * + * @throws Exception + */ + public void testLocalLoadServer() throws Exception { + cacheMode = CacheMode.LOCAL; + factory = new Factory3(); + + startGrids(2); + + Ignite client = startGrid("client-1"); + + IgniteCache cache = grid(0).cache(CACHE_NAME); + + cache.loadCache(null); + + assertEquals(10, cache.localSize(CachePeekMode.ALL)); + assertEquals(0, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + assertEquals(0, client.cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + + assert !loadedFromClient : "Loaded data from client!"; + } + + /** + * Load cache created on client as REPLICATED and see if it only loaded on servers + */ + public void testReplicatedLoadFromClient() throws Exception { + cacheMode = CacheMode.REPLICATED; + factory = new Factory3(); + + startGrids(2); + + Ignite client = startGrid("client-1"); + + IgniteCache cache = client.cache(CACHE_NAME); + + cache.loadCache(null); + + assertEquals(0, cache.localSize(CachePeekMode.ALL)); + + assertEquals(10, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + assertEquals(10, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + + assert !loadedFromClient : "Loaded data from client!"; + } + + /** + * Load cache created on client as REPLICATED and see if it only loaded on servers + */ + public void testPartitionedLoadFromClient() throws Exception { + cacheMode = CacheMode.PARTITIONED; + factory = new Factory3(); + + startGrids(2); + + Ignite client = startGrid("client-1"); + + IgniteCache cache = client.cache(CACHE_NAME); + + cache.loadCache(null); + + assertEquals(0, cache.localSize(CachePeekMode.ALL)); + + assertEquals(10, grid(0).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + assertEquals(10, grid(1).cache(CACHE_NAME).localSize(CachePeekMode.ALL)); + + assert !loadedFromClient : "Loaded data from client!"; + } + + /** */ private static class Factory1 implements Factory<CacheStore> { /** {@inheritDoc} */ @@ -223,9 +345,48 @@ public class CacheClientStoreSelfTest extends GridCommonAbstractTest { /** */ + private static class Factory3 implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** + */ private static class EP implements CacheEntryProcessor { @Override public Object process(MutableEntry entry, Object... arguments) { return null; } } + + /** + * Test store that loads 10 item + */ + public static class TestStore extends CacheStoreAdapter<Object, Object> { + @IgniteInstanceResource + private Ignite ignite; + + @Override + public Integer load(Object key) throws CacheLoaderException { + return null; + } + + @Override + public void write(Cache.Entry<? extends Object, ? extends Object> entry) throws CacheWriterException { + } + + @Override + public void delete(Object key) throws CacheWriterException { + } + + @Override + public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { + if (ignite.cluster().localNode().isClient()) + loadedFromClient = true; + + for (int i = 0; i < 10; i++) + clo.apply(i, i); + } + } } \ No newline at end of file
