IGNITE-7569 Fixed index rebuild future - Fixes #3454. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f515fe2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f515fe2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f515fe2 Branch: refs/heads/ignite-7573 Commit: 4f515fe229f9996dd962ea1feadd31739727974b Parents: f0dec14 Author: Alexey Goncharuk <[email protected]> Authored: Wed Jan 31 11:22:26 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Jan 31 11:22:26 2018 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 29 +++++++++- .../GridCacheDatabaseSharedManager.java | 57 ++++++++++++++------ 2 files changed, 69 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f515fe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6c09b6a..a45c9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -168,7 +168,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private AtomicBoolean added = new AtomicBoolean(false); - /** Event latch. */ + /** + * Discovery event receive latch. There is a race between discovery event processing and single message + * processing, so it is possible to create an exchange future before the actual discovery event is received. + * This latch is notified when the discovery event arrives. + */ @GridToStringExclude private final CountDownLatch evtLatch = new CountDownLatch(1); @@ -344,6 +348,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Sets exchange actions associated with the exchange future (such as cache start or stop). + * Exchange actions is created from discovery event, so the actions must be set before the event is processed, + * thus the setter requires that {@code evtLatch} be armed. + * * @param exchActions Exchange actions. */ public void exchangeActions(ExchangeActions exchActions) { @@ -354,6 +362,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Gets exchanges actions (such as cache start or stop) associated with the exchange future. + * Exchange actions can be {@code null} (for example, if the exchange is created for topology + * change event). + * + * @return Exchange actions. + */ + @Nullable public ExchangeActions exchangeActions() { + return exchActions; + } + + /** + * Sets affinity change message associated with the exchange. Affinity change message is required when + * centralized affinity change is performed. + * * @param affChangeMsg Affinity change message. */ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) { @@ -361,9 +383,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Gets the affinity topology version for which this exchange was created. If several exchanges + * were merged, initial version is the version of the earliest merged exchange. + * * @return Initial exchange version. */ - public AffinityTopologyVersion initialVersion() { + @Override public AffinityTopologyVersion initialVersion() { return exchId.topologyVersion(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4f515fe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index f833911..0b35f18 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -105,6 +105,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRec import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; @@ -320,8 +321,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */ private ThreadLocal<ByteBuffer> threadBuf; - /** */ - private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts = new ConcurrentHashMap<>(); + /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */ + private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>(); /** * Lock holder for compatible folders mode. Null if lock holder was created at start node. <br> @@ -1127,33 +1128,59 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan // Before local node join event. if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) restoreState(); + + if (cctx.kernalContext().query().moduleEnabled()) { + ExchangeActions acts = fut.exchangeActions(); + + if (acts != null && !F.isEmpty(acts.cacheStartRequests())) { + for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests()) { + int cacheId = CU.cacheId(actionData.request().cacheName()); + + GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>()); + + if (old != null) + old.onDone(); + } + } + } } /** {@inheritDoc} */ @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) { if (cctx.kernalContext().query().moduleEnabled()) { for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) { - if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) && - !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) { + if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) { final int cacheId = cacheCtx.cacheId(); + final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId); + + if (!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) { + IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query() + .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId())); - final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query() - .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId())); + assert usrFut != null : "Missing user future for cache: " + cacheCtx.name(); - idxRebuildFuts.put(cacheId, rebuildFut); + rebuildFut.listen(new CI1<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture igniteInternalFut) { + idxRebuildFuts.remove(cacheId, usrFut); - rebuildFut.listen(new CI1<IgniteInternalFuture>() { - @Override public void apply(IgniteInternalFuture igniteInternalFut) { - idxRebuildFuts.remove(cacheId, rebuildFut); + usrFut.onDone(igniteInternalFut.error()); - CacheConfiguration ccfg = cacheCtx.config(); + CacheConfiguration ccfg = cacheCtx.config(); - if (ccfg != null) { - log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName() - + ", grpName=" + ccfg.getGroupName() + ']'); + if (ccfg != null) { + log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName() + + ", grpName=" + ccfg.getGroupName() + ']'); + } } + }); + } + else { + if (usrFut != null) { + idxRebuildFuts.remove(cacheId, usrFut); + + usrFut.onDone(); } - }); + } } } }
