ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82eb1d4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82eb1d4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82eb1d4c Branch: refs/heads/ignite-5075 Commit: 82eb1d4c5533b27973cdf37fd9bc47e3aa43432a Parents: 758779a Author: sboikov <[email protected]> Authored: Thu May 4 10:25:39 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 4 11:12:55 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 4 ++-- .../cache/DynamicCacheDescriptor.java | 9 ++++--- .../processors/cache/GridCacheProcessor.java | 25 +++++++++++++------- .../GridDhtAtomicAbstractUpdateRequest.java | 7 ++++++ .../continuous/CacheContinuousQueryManager.java | 12 ++++++++-- ...niteTopologyValidatorGridSplitCacheTest.java | 8 +++++-- 6 files changed, 48 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index fa723be..059c8ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -554,6 +554,8 @@ class ClusterCachesInfo { NearCacheConfiguration nearCfg = null; if (locCfg != null) { + nearCfg = locCfg.config().getNearConfiguration(); + DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, locCfg.config(), desc.cacheType(), @@ -567,8 +569,6 @@ class ClusterCachesInfo { desc0.staticallyConfigured(desc.staticallyConfigured()); desc = desc0; - - nearCfg = locCfg.config().getNearConfiguration(); } if (locCfg != null || CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter())) http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index ccde2e9..fe859f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -103,6 +103,12 @@ public class DynamicCacheDescriptor { assert cacheCfg != null; assert schema != null; + if (cacheCfg.getCacheMode() == CacheMode.REPLICATED && cacheCfg.getNearConfiguration() != null) { + cacheCfg = new CacheConfiguration(cacheCfg); + + cacheCfg.setNearConfiguration(null); + } + this.cacheCfg = cacheCfg; this.cacheType = cacheType; this.template = template; @@ -112,9 +118,6 @@ public class DynamicCacheDescriptor { cacheId = CU.cacheId(cacheCfg.getName()); - if (cacheCfg.getCacheMode() == CacheMode.REPLICATED) - cacheCfg.setNearConfiguration(null); - synchronized (schemaMux) { this.schema = schema.copy(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a255e9d..8f15852 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1901,13 +1901,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter<?, ?> cache : caches.values()) { GridCacheContext<?, ?> cacheCtx = cache.context(); - if (F.eq(cacheCtx.startTopologyVersion(), topVer)) { + if (cacheCtx.startTopologyVersion().equals(topVer)) { + jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + if (cacheCtx.preloader() != null) cacheCtx.preloader().onInitialExchangeComplete(err); - - String masked = cacheCtx.name(); - - jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null, false)); } } @@ -1921,16 +1919,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) { String cacheName = req.cacheName(); - IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(cacheName); + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); if (proxy != null) { if (proxy.context().affinityNode()) { GridCacheAdapter<?, ?> cache = caches.get(cacheName); - if (cache != null) - jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); + assert cache != null : cacheName; + + jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); } else { + jCacheProxies.remove(cacheName); + proxy.context().gate().onStopped(); prepareCacheStop(req); @@ -3005,6 +3006,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param name Cache name. + * @return Cache proxy. + */ + @Nullable public IgniteCacheProxy jcacheProxy(String name) { + return jCacheProxies.get(name); + } + + /** * @return All configured public cache instances. */ public Collection<IgniteCacheProxy<?, ?>> publicCaches() { http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index 4ff8484..967d4fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -122,6 +122,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag boolean keepBinary, boolean skipStore ) { + assert topVer.topologyVersion() > 0 : topVer; + this.cacheId = cacheId; this.nodeId = nodeId; this.futId = futId; @@ -138,6 +140,11 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK); } + /** {@inheritDoc} */ + @Override public final AffinityTopologyVersion topologyVersion() { + return topVer; + } + void nearReplyInfo(UUID nearNodeId, long nearFutId) { assert nearNodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 8377754..bc703a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; @@ -340,8 +341,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { updateCntr, topVer); - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); + IgniteCacheProxy jcache = cctx.kernalContext().cache().jcache(cctx.name()); + + assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() + + ", locStart=" + cctx.startTopologyVersion() + + ", cacheStart=" + cctx.cacheStartTopologyVersion() + + ", locNode=" + cctx.localNode() + + ", stopping=" + cctx.kernalContext().isStopping(); + + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(jcache, cctx, e0); lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/82eb1d4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java index fd77309..057b0d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java @@ -130,7 +130,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac /** * Tests topology split scenario. - * @throws Exception + * + * @throws Exception If failed. */ public void testTopologyValidator() throws Exception { assertTrue(initLatch.await(10, TimeUnit.SECONDS)); @@ -242,12 +243,15 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac /** */ private static final long serialVersionUID = 0L; + /** */ @CacheNameResource private String cacheName; + /** */ @IgniteInstanceResource private Ignite ignite; + /** */ @LoggerResource private IgniteLogger log; @@ -263,7 +267,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac }).isEmpty()) return false; - IgniteKernal kernal = (IgniteKernal)ignite.cache(cacheName).unwrap(Ignite.class); + IgniteKernal kernal = (IgniteKernal)ignite; GridDhtCacheAdapter<Object, Object> dht = kernal.context().cache().internalCache(cacheName).context().dht();
