IGNITE-7569 Fixed index rebuild future - Fixes #3454.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f515fe2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f515fe2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f515fe2

Branch: refs/heads/ignite-7573
Commit: 4f515fe229f9996dd962ea1feadd31739727974b
Parents: f0dec14
Author: Alexey Goncharuk <[email protected]>
Authored: Wed Jan 31 11:22:26 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed Jan 31 11:22:26 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        | 29 +++++++++-
 .../GridCacheDatabaseSharedManager.java         | 57 ++++++++++++++------
 2 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4f515fe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6c09b6a..a45c9b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -168,7 +168,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     private AtomicBoolean added = new AtomicBoolean(false);
 
-    /** Event latch. */
+    /**
+     * Discovery event receive latch. There is a race between discovery event 
processing and single message
+     * processing, so it is possible to create an exchange future before the 
actual discovery event is received.
+     * This latch is notified when the discovery event arrives.
+     */
     @GridToStringExclude
     private final CountDownLatch evtLatch = new CountDownLatch(1);
 
@@ -344,6 +348,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Sets exchange actions associated with the exchange future (such as 
cache start or stop).
+     * Exchange actions is created from discovery event, so the actions must 
be set before the event is processed,
+     * thus the setter requires that {@code evtLatch} be armed.
+     *
      * @param exchActions Exchange actions.
      */
     public void exchangeActions(ExchangeActions exchActions) {
@@ -354,6 +362,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Gets exchanges actions (such as cache start or stop) associated with 
the exchange future.
+     * Exchange actions can be {@code null} (for example, if the exchange is 
created for topology
+     * change event).
+     *
+     * @return Exchange actions.
+     */
+    @Nullable public ExchangeActions exchangeActions() {
+        return exchActions;
+    }
+
+    /**
+     * Sets affinity change message associated with the exchange. Affinity 
change message is required when
+     * centralized affinity change is performed.
+     *
      * @param affChangeMsg Affinity change message.
      */
     public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) 
{
@@ -361,9 +383,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Gets the affinity topology version for which this exchange was created. 
If several exchanges
+     * were merged, initial version is the version of the earliest merged 
exchange.
+     *
      * @return Initial exchange version.
      */
-    public AffinityTopologyVersion initialVersion() {
+    @Override public AffinityTopologyVersion initialVersion() {
         return exchId.topologyVersion();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f515fe2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index f833911..0b35f18 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -105,6 +105,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRec
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
@@ -320,8 +321,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Thread local with buffers for the checkpoint threads. Each buffer 
represent one page for durable memory. */
     private ThreadLocal<ByteBuffer> threadBuf;
 
-    /** */
-    private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts 
= new ConcurrentHashMap<>();
+    /** Map from a cacheId to a future indicating that there is an in-progress 
index rebuild for the given cache. */
+    private final ConcurrentMap<Integer, GridFutureAdapter<Void>> 
idxRebuildFuts = new ConcurrentHashMap<>();
 
     /**
      * Lock holder for compatible folders mode. Null if lock holder was 
created at start node. <br>
@@ -1127,33 +1128,59 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         // Before local node join event.
         if (clusterInTransitionStateToActive || (joinEvt && locNode && 
isSrvNode))
             restoreState();
+
+        if (cctx.kernalContext().query().moduleEnabled()) {
+            ExchangeActions acts = fut.exchangeActions();
+
+            if (acts != null && !F.isEmpty(acts.cacheStartRequests())) {
+                for (ExchangeActions.CacheActionData actionData : 
acts.cacheStartRequests()) {
+                    int cacheId = CU.cacheId(actionData.request().cacheName());
+
+                    GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, 
new GridFutureAdapter<>());
+
+                    if (old != null)
+                        old.onDone();
+                }
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void 
rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
         if (cctx.kernalContext().query().moduleEnabled()) {
             for (final GridCacheContext cacheCtx : 
(Collection<GridCacheContext>)cctx.cacheContexts()) {
-                if 
(cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
-                    !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && 
cacheCtx.affinityNode()) {
+                if 
(cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
                     final int cacheId = cacheCtx.cacheId();
+                    final GridFutureAdapter<Void> usrFut = 
idxRebuildFuts.get(cacheId);
+
+                    if (!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && 
cacheCtx.affinityNode()) {
+                        IgniteInternalFuture<?> rebuildFut = 
cctx.kernalContext().query()
+                            
.rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
 
-                    final IgniteInternalFuture<?> rebuildFut = 
cctx.kernalContext().query()
-                        
.rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
+                        assert usrFut != null : "Missing user future for 
cache: " + cacheCtx.name();
 
-                    idxRebuildFuts.put(cacheId, rebuildFut);
+                        rebuildFut.listen(new CI1<IgniteInternalFuture>() {
+                            @Override public void apply(IgniteInternalFuture 
igniteInternalFut) {
+                                idxRebuildFuts.remove(cacheId, usrFut);
 
-                    rebuildFut.listen(new CI1<IgniteInternalFuture>() {
-                        @Override public void apply(IgniteInternalFuture 
igniteInternalFut) {
-                            idxRebuildFuts.remove(cacheId, rebuildFut);
+                                usrFut.onDone(igniteInternalFut.error());
 
-                            CacheConfiguration ccfg = cacheCtx.config();
+                                CacheConfiguration ccfg = cacheCtx.config();
 
-                            if (ccfg != null) {
-                                log().info("Finished indexes rebuilding for 
cache [name=" + ccfg.getName()
-                                    + ", grpName=" + ccfg.getGroupName() + 
']');
+                                if (ccfg != null) {
+                                    log().info("Finished indexes rebuilding 
for cache [name=" + ccfg.getName()
+                                        + ", grpName=" + ccfg.getGroupName() + 
']');
+                                }
                             }
+                        });
+                    }
+                    else {
+                        if (usrFut != null) {
+                            idxRebuildFuts.remove(cacheId, usrFut);
+
+                            usrFut.onDone();
                         }
-                    });
+                    }
                 }
             }
         }

Reply via email to