Repository: ignite Updated Branches: refs/heads/ignite-1537 0d2cb9089 -> 0e5d35bae
ignite-1.5 Should not wait on preloader sync future for system caches callbacks. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e5d35ba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e5d35ba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e5d35ba Branch: refs/heads/ignite-1537 Commit: 0e5d35bae6773c70eb8eade15adb3a4234f26e74 Parents: 0d2cb90 Author: sboikov <[email protected]> Authored: Mon Nov 30 12:56:06 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 30 12:56:06 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 4 +-- .../processors/cache/GridCacheContext.java | 22 +++++--------- .../processors/cache/GridCachePreloader.java | 5 +++ .../cache/GridCachePreloaderAdapter.java | 7 ++++- .../processors/cache/GridCacheProcessor.java | 14 ++++++--- .../dht/atomic/GridDhtAtomicCache.java | 4 +-- .../dht/preloader/GridDhtPartitionDemander.java | 17 +++++++---- .../dht/preloader/GridDhtPreloader.java | 23 ++++++++++++++ ...eAbstractDataStructuresFailoverSelfTest.java | 32 +++++++++++++------- ...ObjectsCacheDataStructuresSelfTestSuite.java | 4 +++ 10 files changed, 91 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 889b25c..be06f85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -2057,7 +2057,7 @@ public class IgnitionEx { cache.setWriteSynchronizationMode(FULL_SYNC); cache.setAffinity(new RendezvousAffinityFunction(false, 100)); cache.setNodeFilter(CacheConfiguration.ALL_NODES); - cache.setRebalanceOrder(-1);//Prior to user caches. + cache.setRebalanceOrder(-2); //Prior to user caches. return cache; } @@ -2078,7 +2078,7 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); - ccfg.setRebalanceOrder(-1);//Prior to user caches. + ccfg.setRebalanceOrder(-1); //Prior to user caches. if (cfg.getCacheMode() == PARTITIONED) ccfg.setBackups(cfg.getBackups()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 6e5f958..4e755bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -239,6 +239,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Deployment enabled flag for this specific cache */ private boolean depEnabled; + /** */ + private boolean deferredDelete; + /** * Empty constructor required for {@link Externalizable}. */ @@ -506,6 +509,9 @@ public class GridCacheContext<K, V> implements Externalizable { */ public void cache(GridCacheAdapter<K, V> cache) { this.cache = cache; + + deferredDelete = cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || + (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC); } /** @@ -568,21 +574,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if entries should not be deleted from cache immediately. */ public boolean deferredDelete() { - GridCacheAdapter<K, V> cache = this.cache; - - if (cache == null) - throw new IllegalStateException("Cache stopped: " + cacheName); - - return deferredDelete(cache); - } - - /** - * @param cache Cache. - * @return {@code True} if entries should not be deleted from cache immediately. - */ - public boolean deferredDelete(GridCacheAdapter<?, ?> cache) { - return cache.isDht() || cache.isDhtAtomic() || cache.isColocated() || - (cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC); + return deferredDelete; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 1658a89..8e1164b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -143,6 +143,11 @@ public interface GridCachePreloader { public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer); /** + * @return Future completed when rebalance on node start topology finished. + */ + public IgniteInternalFuture<?> initialRebalanceFuture(); + + /** * Force preload process. */ public void forcePreload(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 9c0e9f7..a1704fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -119,7 +119,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { - return new GridFinishedFuture<>(true); + return finFut; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> initialRebalanceFuture() { + return finFut; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 6654a15..e53f186 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -796,7 +796,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!ctx.config().isDaemon()) ctx.marshallerContext().onMarshallerCacheStarted(ctx); - marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { + marshallerCache().context().preloader().initialRebalanceFuture().listen(new CIX1<IgniteInternalFuture<?>>() { @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { ctx.marshallerContext().onMarshallerCachePreloaded(ctx); } @@ -817,10 +817,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cfg.getRebalanceMode() == SYNC) { if (cfg.getCacheMode() == REPLICATED || (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { - cache.preloader().syncFuture().get(); + boolean utilityCache = CU.isUtilityCache(cache.name()); - if (CU.isUtilityCache(cache.name())) - ctx.cacheObjects().onUtilityCacheStarted(); + if (utilityCache || CU.isMarshallerCache(cache.name())) { + cache.preloader().initialRebalanceFuture().get(); + + if (utilityCache) + ctx.cacheObjects().onUtilityCacheStarted(); + } + else + cache.preloader().syncFuture().get(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a49341b..d8ab62a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1378,7 +1378,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Enqueue if necessary after locks release. if (deleted != null) { assert !deleted.isEmpty(); - assert ctx.deferredDelete(this) : this; + assert ctx.deferredDelete() : this; for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) ctx.onDeferredDelete(e.get1(), e.get2()); @@ -2430,7 +2430,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) { // Process deleted entries before locks release. - assert ctx.deferredDelete(this) : this; + assert ctx.deferredDelete() : this; // Entries to skip eviction manager notification for. // Enqueue entries while holding locks. http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 7ccb68e..20f12b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -145,9 +145,8 @@ public class GridDhtPartitionDemander { Map<Integer, Object> tops = new HashMap<>(); - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); - } rebalanceTopics = tops; } @@ -862,9 +861,8 @@ public class GridDhtPartitionDemander { U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() + ", topology=" + topologyVersion()); - for (UUID nodeId : remaining.keySet()) { + for (UUID nodeId : remaining.keySet()) cleanupRemoteContexts(nodeId); - } remaining.clear(); @@ -959,9 +957,9 @@ public class GridDhtPartitionDemander { Collection<Integer> parts = remaining.get(nodeId).get2(); if (parts != null) { - boolean removed = parts.remove(p); + boolean rmvd = parts.remove(p); - assert removed; + assert rmvd; if (parts.isEmpty()) { U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + @@ -1039,6 +1037,11 @@ public class GridDhtPartitionDemander { onDone(!cancelled); } } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(RebalanceFuture.class, this); + } } /** @@ -1109,10 +1112,12 @@ public class GridDhtPartitionDemander { /** Hide worker logger and use cache logger instead. */ private IgniteLogger log = GridDhtPartitionDemander.this.log; + /** */ private volatile RebalanceFuture fut; /** * @param id Worker ID. + * @param fut Rebalance future. */ private DemandWorker(int id, RebalanceFuture fut) { assert id >= 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 8e56c2d..a12e1e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -107,6 +107,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Start future. */ private GridFutureAdapter<Object> startFut; + /** Future completed when rebalance on start topology finished. */ + private final GridFutureAdapter<Object> initRebalanceFut; + /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); @@ -155,6 +158,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values()) fut.onNodeLeft(e.eventNode().id()); } + + if (!initRebalanceFut.isDone()) { + cctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + initRebalanceFut.onDone(); + } + }); + } } finally { leaveBusy(); @@ -171,6 +182,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { top = cctx.dht().topology(); startFut = new GridFutureAdapter<>(); + initRebalanceFut = new GridFutureAdapter<>(); } /** {@inheritDoc} */ @@ -209,6 +221,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { supplier = new GridDhtPartitionSupplier(cctx); demander = new GridDhtPartitionDemander(cctx, demandLock); + demander.rebalanceFuture().listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + initRebalanceFut.onDone(); + } + }); + supplier.start(); demander.start(); @@ -456,6 +474,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> initialRebalanceFuture() { + return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : initRebalanceFut; + } + /** * @param topVer Requested topology version. * @param fut Future to add. http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 4ee200b..2751de1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -446,6 +446,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig String name = UUID.randomUUID().toString(); try { + log.info("Start node: " + name); + Ignite g = startGrid(name); assert g.semaphore(STRUCTURE_NAME, 10, false, false) != null; @@ -502,6 +504,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig String name = UUID.randomUUID().toString(); try { + log.info("Start node: " + name); + Ignite g = startGrid(name); final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); @@ -510,14 +514,12 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig sem.acquire(); - if (i == TOP_CHANGE_CNT - 1) { + if (i == TOP_CHANGE_CNT - 1) sem.release(); - } } finally { - if (i != TOP_CHANGE_CNT - 1) { + if (i != TOP_CHANGE_CNT - 1) stopGrid(name); - } } } } @@ -572,6 +574,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig names.add(name); + log.info("Start node: " + name); + Ignite g = startGrid(name); final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); @@ -580,9 +584,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig sem.acquire(); - if (i == TOP_CHANGE_CNT - 1) { + if (i == TOP_CHANGE_CNT - 1) sem.release(); - } } } finally { @@ -639,21 +642,20 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig String name = UUID.randomUUID().toString(); try { + log.info("Start node: " + name); + Ignite g = startGrid(name); final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); assertNotNull(sem); - if (i != 1) { + if (i != 1) sem.acquire(); - } - } finally { - if (i != 1) { + if (i != 1) stopGrid(name); - } } } @@ -846,6 +848,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig int id = idx.getAndIncrement(); try { + log.info("Start node: " + id); + startGrid(id); Thread.sleep(1000); @@ -1008,6 +1012,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig String name = UUID.randomUUID().toString(); try { + log.info("Start node: " + name); + Ignite g = startGrid(name); callback.apply(g); @@ -1055,6 +1061,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig String name = UUID.randomUUID().toString(); + log.info("Start node: " + name); + Ignite g = startGrid(name); names.add(name); @@ -1144,6 +1152,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig startedNodes.add(name); + log.info("Start node: " + name); + Ignite g = startGrid(name); callback.apply(g); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e5d35ba/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsCacheDataStructuresSelfTestSuite.java index 1a77518..d18e3e3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsCacheDataStructuresSelfTestSuite.java @@ -25,6 +25,10 @@ import org.apache.ignite.testframework.config.GridTestProperties; * */ public class IgniteBinaryObjectsCacheDataStructuresSelfTestSuite { + /** + * @return Test suite. + * @throws Exception If failed. + */ public static TestSuite suite() throws Exception { GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());
