IGNITE-1094 Fixed hanging during cache initialization

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a393e696
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a393e696
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a393e696

Branch: refs/heads/ignite-8783
Commit: a393e696212ef0dd4f23f923bf1001e0a48db915
Parents: 84a7b59
Author: Slava Koptilin <slava.kopti...@gmail.com>
Authored: Mon Jul 16 16:40:56 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Mon Jul 16 16:40:56 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   5 +
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../internal/managers/discovery/DiscoCache.java |  20 +
 .../cache/CacheAffinitySharedManager.java       | 148 +++-
 .../processors/cache/ClusterCachesInfo.java     |  25 +
 .../cache/DynamicCacheChangeFailureMessage.java | 151 ++++
 .../processors/cache/ExchangeActions.java       |  19 +-
 .../GridCachePartitionExchangeManager.java      |   8 +
 .../processors/cache/GridCacheProcessor.java    | 169 ++--
 .../GridDhtPartitionsExchangeFuture.java        | 282 ++++++-
 ...IgniteAbstractDynamicCacheStartFailTest.java | 775 +++++++++++++++++++
 ...ynamicCacheStartCoordinatorFailoverTest.java | 262 +++++++
 .../cache/IgniteDynamicCacheStartFailTest.java  |  46 ++
 ...ynamicCacheStartFailWithPersistenceTest.java |  91 +++
 .../testsuites/IgniteCacheTestSuite4.java       |   6 +
 ...eQueryAfterDynamicCacheStartFailureTest.java |  69 ++
 .../IgniteCacheWithIndexingTestSuite.java       |   3 +
 17 files changed, 1921 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 4687114..4c8fa9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -239,6 +239,7 @@ import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME;
@@ -1674,6 +1675,10 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         if (cfg.getConnectorConfiguration() != null)
             add(ATTR_REST_PORT_RANGE, 
cfg.getConnectorConfiguration().getPortRange());
 
+        // Whether rollback of dynamic cache start is supported or not.
+        // This property is added because of backward compatibility.
+        add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE);
+
         // Save data storage configuration.
         addDataStorageConfigurationAttributes();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 663a6f9..ed16a77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -202,6 +202,9 @@ public final class IgniteNodeAttributes {
     /** Rebalance thread pool size. */
     public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + 
".rebalance.pool.size";
 
+    /** Internal attribute name constant. */
+    public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = 
ATTR_PREFIX + ".dynamic.cache.start.rollback.supported";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 73f6d23..8cdcbf3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -424,6 +424,26 @@ public class DiscoCache {
     }
 
     /**
+     *
+     * Returns {@code True} if all nodes has the given attribute and its value 
equals to {@code expVal}.
+     *
+     * @param <T> Attribute Type.
+     * @param name Attribute name.
+     * @param expVal Expected value.
+     * @return {@code True} if all the given nodes has the given attribute and 
its value equals to {@code expVal}.
+     */
+    public <T> boolean checkAttribute(String name, T expVal) {
+        for (ClusterNode node : allNodes) {
+            T attr = node.attribute(name);
+
+            if (attr == null || !expVal.equals(attr))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param nodes Cluster nodes.
      * @return Empty collection if nodes list is {@code null}
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 08eb43f..2871e82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -731,6 +731,28 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
+     * Called during the rollback of the exchange partitions procedure
+     * in order to stop the given cache even if it's not fully initialized 
(e.g. failed on cache init stage).
+     *
+     * @param fut Exchange future.
+     * @param crd Coordinator flag.
+     * @param exchActions Cache change requests.
+     */
+    public void forceCloseCaches(
+        GridDhtPartitionsExchangeFuture fut,
+        boolean crd,
+        final ExchangeActions exchActions
+    ) {
+        assert exchActions != null && !exchActions.empty() && 
exchActions.cacheStartRequests().isEmpty(): exchActions;
+
+        caches.updateCachesInfo(exchActions);
+
+        processCacheStopRequests(fut, crd, exchActions, true);
+
+        cctx.cache().forceCloseCaches(exchActions);
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.
@@ -745,13 +767,70 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
-        final ExchangeDiscoveryEvents evts = fut.context().events();
-
         caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
         onCustomMessageNoAffinityChange(fut, crd, exchActions);
 
+        processCacheStartRequests(fut, crd, exchActions);
+
+        Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd, 
exchActions, false);
+
+        if (stoppedGrps != null) {
+            AffinityTopologyVersion notifyTopVer = null;
+
+            synchronized (mux) {
+                if (waitInfo != null) {
+                    for (Integer grpId : stoppedGrps) {
+                        boolean rmv = waitInfo.waitGrps.remove(grpId) != null;
+
+                        if (rmv) {
+                            notifyTopVer = waitInfo.topVer;
+
+                            waitInfo.assignments.remove(grpId);
+                        }
+                    }
+                }
+            }
+
+            if (notifyTopVer != null) {
+                final AffinityTopologyVersion topVer = notifyTopVer;
+
+                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        onCacheGroupStopped(topVer);
+                    }
+                });
+            }
+        }
+
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
+
+        if (msg != null) {
+            msg.checkCachesExist(caches.registeredCaches.keySet());
+
+            if (msg.empty())
+                clientCacheChanges.remove();
+        }
+    }
+
+    /**
+     * Process cache start requests.
+     *
+     * @param fut Exchange future.
+     * @param crd Coordinator flag.
+     * @param exchActions Cache change requests.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processCacheStartRequests(
+        GridDhtPartitionsExchangeFuture fut,
+        boolean crd,
+        final ExchangeActions exchActions
+    ) throws IgniteCheckedException {
+        assert exchActions != null && !exchActions.empty() : exchActions;
+
+        final ExchangeDiscoveryEvents evts = fut.context().events();
+
         for (ExchangeActions.CacheActionData action : 
exchActions.cacheStartRequests()) {
             DynamicCacheDescriptor cacheDesc = action.descriptor();
 
@@ -830,6 +909,24 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 }
             }
         }
+    }
+
+    /**
+     * Process cache stop requests.
+     *
+     * @param fut Exchange future.
+     * @param crd Coordinator flag.
+     * @param exchActions Cache change requests.
+     * @param forceClose
+     * @return Set of cache groups to be stopped.
+     */
+    private Set<Integer> processCacheStopRequests(
+        GridDhtPartitionsExchangeFuture fut,
+        boolean crd,
+        final ExchangeActions exchActions,
+        boolean forceClose
+    ) {
+        assert exchActions != null && !exchActions.empty() : exchActions;
 
         for (ExchangeActions.CacheActionData action : 
exchActions.cacheStopRequests())
             cctx.cache().blockGateway(action.request().cacheName(), true, 
action.request().restart());
@@ -844,54 +941,21 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 if (data.descriptor().config().getCacheMode() != LOCAL) {
                     CacheGroupHolder cacheGrp = 
grpHolders.remove(data.descriptor().groupId());
 
-                    assert cacheGrp != null : data.descriptor();
-
-                    if (stoppedGrps == null)
-                        stoppedGrps = new HashSet<>();
-
-                    stoppedGrps.add(cacheGrp.groupId());
-
-                    cctx.io().removeHandler(true, cacheGrp.groupId(), 
GridDhtAffinityAssignmentResponse.class);
-                }
-            }
-        }
-
-        if (stoppedGrps != null) {
-            AffinityTopologyVersion notifyTopVer = null;
+                    assert cacheGrp != null || forceClose : data.descriptor();
 
-            synchronized (mux) {
-                if (waitInfo != null) {
-                    for (Integer grpId : stoppedGrps) {
-                        boolean rmv = waitInfo.waitGrps.remove(grpId) != null;
+                    if (cacheGrp != null) {
+                        if (stoppedGrps == null)
+                            stoppedGrps = new HashSet<>();
 
-                        if (rmv) {
-                            notifyTopVer = waitInfo.topVer;
+                        stoppedGrps.add(cacheGrp.groupId());
 
-                            waitInfo.assignments.remove(grpId);
-                        }
+                        cctx.io().removeHandler(true, cacheGrp.groupId(), 
GridDhtAffinityAssignmentResponse.class);
                     }
                 }
             }
-
-            if (notifyTopVer != null) {
-                final AffinityTopologyVersion topVer = notifyTopVer;
-
-                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        onCacheGroupStopped(topVer);
-                    }
-                });
-            }
         }
 
-        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
-
-        if (msg != null) {
-            msg.checkCachesExist(caches.registeredCaches.keySet());
-
-            if (msg.empty())
-                clientCacheChanges.remove();
-        }
+        return stoppedGrps;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 94f8a27..3aaf7f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -396,6 +396,31 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Creates exchanges actions. Forms a list of caches and cache groups to 
be stopped
+     * due to dynamic cache start failure.
+     *
+     * @param failMsg Dynamic change request fail message.
+     * @param topVer Topology version.
+     */
+    public void onCacheChangeRequested(DynamicCacheChangeFailureMessage 
failMsg, AffinityTopologyVersion topVer) {
+        ExchangeActions exchangeActions = new ExchangeActions();
+
+        List<DynamicCacheChangeRequest> requests = new 
ArrayList<>(failMsg.cacheNames().size());
+
+        for (String cacheName : failMsg.cacheNames()) {
+            DynamicCacheDescriptor cacheDescr = 
registeredCaches.get(cacheName);
+
+            assert cacheDescr != null : "Dynamic cache descriptor is missing 
[cacheName=" + cacheName + "]";
+
+            requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName, 
cacheDescr.sql(), true));
+        }
+
+        processCacheChangeRequests(exchangeActions, requests, topVer,false);
+
+        failMsg.exchangeActions(exchangeActions);
+    }
+
+    /**
      * @param batch Cache change request.
      * @param topVer Topology version.
      * @return {@code True} if minor topology version should be increased.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java
new file mode 100644
index 0000000..d0cb08d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class represents discovery message that is used to provide information 
about dynamic cache start failure.
+ */
+public class DynamicCacheChangeFailureMessage implements 
DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache names. */
+    @GridToStringInclude
+    private Collection<String> cacheNames;
+
+    /** Custom message ID. */
+    private IgniteUuid id;
+
+    /** */
+    private GridDhtPartitionExchangeId exchId;
+
+    /** */
+    @GridToStringInclude
+    private IgniteCheckedException cause;
+
+    /** Cache updates to be executed on exchange. */
+    private transient ExchangeActions exchangeActions;
+
+    /**
+     * Creates new DynamicCacheChangeFailureMessage instance.
+     *
+     * @param locNode Local node.
+     * @param exchId Exchange Id.
+     * @param cause Cache start error.
+     * @param cacheNames Cache names.
+     */
+    public DynamicCacheChangeFailureMessage(
+        ClusterNode locNode,
+        GridDhtPartitionExchangeId exchId,
+        IgniteCheckedException cause,
+        Collection<String> cacheNames)
+    {
+        assert exchId != null;
+        assert cause != null;
+        assert !F.isEmpty(cacheNames) : cacheNames;
+
+        this.id = IgniteUuid.fromUuid(locNode.id());
+        this.exchId = exchId;
+        this.cause = cause;
+        this.cacheNames = cacheNames;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return Collection of failed caches.
+     */
+    public Collection<String> cacheNames() {
+        return cacheNames;
+    }
+
+    /**
+     * @return Cache start error.
+     */
+    public IgniteCheckedException error() {
+        return cause;
+    }
+
+    /**
+     * @return Cache updates to be executed on exchange.
+     */
+    public ExchangeActions exchangeActions() {
+        return exchangeActions;
+    }
+
+    /**
+     * @param exchangeActions Cache updates to be executed on exchange.
+     */
+    public void exchangeActions(ExchangeActions exchangeActions) {
+        assert exchangeActions != null && !exchangeActions.empty() : 
exchangeActions;
+
+        this.exchangeActions = exchangeActions;
+    }
+
+    /**
+     * @return Exchange version.
+     */
+    @Nullable public GridDhtPartitionExchangeId exchangeId() {
+        return exchId;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoCache createDiscoCache(
+        GridDiscoveryManager mgr,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache) {
+        return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DynamicCacheChangeFailureMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index c289b6e..6431d0f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -99,17 +99,18 @@ public class ExchangeActions {
     /**
      * @return Stop cache requests.
      */
-    Collection<CacheActionData> cacheStopRequests() {
+    public Collection<CacheActionData> cacheStopRequests() {
         return cachesToStop != null ? cachesToStop.values() : 
Collections.<CacheActionData>emptyList();
     }
 
     /**
      * @param ctx Context.
+     * @param err Error if any.
      */
-    public void completeRequestFutures(GridCacheSharedContext ctx) {
-        completeRequestFutures(cachesToStart, ctx);
-        completeRequestFutures(cachesToStop, ctx);
-        completeRequestFutures(cachesToResetLostParts, ctx);
+    public void completeRequestFutures(GridCacheSharedContext ctx, Throwable 
err) {
+        completeRequestFutures(cachesToStart, ctx, err);
+        completeRequestFutures(cachesToStop, ctx, err);
+        completeRequestFutures(cachesToResetLostParts, ctx, err);
     }
 
     /**
@@ -130,10 +131,14 @@ public class ExchangeActions {
      * @param map Actions map.
      * @param ctx Context.
      */
-    private void completeRequestFutures(Map<String, CacheActionData> map, 
GridCacheSharedContext ctx) {
+    private void completeRequestFutures(
+        Map<String, CacheActionData> map,
+        GridCacheSharedContext ctx,
+        @Nullable Throwable err
+    ) {
         if (map != null) {
             for (CacheActionData req : map.values())
-                ctx.cache().completeCacheStartFuture(req.req, true, null);
+                ctx.cache().completeCacheStartFuture(req.req, (err == null), 
err);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2bdda19..d3fddab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -494,6 +494,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     exchangeFuture(msg.exchangeId(), null, null, null, null)
                         .onAffinityChangeMessage(evt.eventNode(), msg);
             }
+            else if (customMsg instanceof DynamicCacheChangeFailureMessage) {
+                DynamicCacheChangeFailureMessage msg = 
(DynamicCacheChangeFailureMessage) customMsg;
+
+                if (msg.exchangeId().topologyVersion().topologyVersion() >=
+                    
affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion())
+                    exchangeFuture(msg.exchangeId(), null, null, null, null)
+                        .onDynamicCacheChangeFail(evt.eventNode(), msg);
+            }
             else if (customMsg instanceof SnapshotDiscoveryMessage
                 && ((SnapshotDiscoveryMessage) customMsg).needExchange()) {
                 exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 26f1887..ae4fee4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2076,7 +2076,6 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             }
             else
                 proxy.closeProxy();
-
         }
     }
 
@@ -2116,6 +2115,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Stopped cache context.
      */
     private GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean 
destroy) {
+        assert sharedCtx.database().checkpointLockIsHeldByThread();
+
         GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
 
         if (cache != null) {
@@ -2221,7 +2222,14 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             cctx.gate().onStopped();
 
-            prepareCacheStop(cctx.name(), destroy);
+            sharedCtx.database().checkpointReadLock();
+
+            try {
+                prepareCacheStop(cctx.name(), destroy);
+            }
+            finally {
+                sharedCtx.database().checkpointReadUnlock();
+            }
 
             if (!cctx.group().hasCaches())
                 stopCacheGroup(cctx.group().groupId());
@@ -2229,101 +2237,119 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Callback invoked when first exchange future for dynamic cache is 
completed.
+     * Called during the rollback of the exchange partitions procedure
+     * in order to stop the given cache even if it's not fully initialized 
(e.g. failed on cache init stage).
      *
-     * @param cacheStartVer Started caches version to create proxy for.
-     * @param exchActions Change requests.
-     * @param err Error.
+     * @param exchActions Stop requests.
      */
-    @SuppressWarnings("unchecked")
-    public void onExchangeDone(
-        AffinityTopologyVersion cacheStartVer,
-        @Nullable ExchangeActions exchActions,
-        @Nullable Throwable err
-    ) {
-        initCacheProxies(cacheStartVer, err);
-
-        if (exchActions == null)
-            return;
-
-        if (exchActions.systemCachesStarting() && 
exchActions.stateChangeRequest() == null) {
-            ctx.dataStructures().restoreStructuresState(ctx);
+    void forceCloseCaches(ExchangeActions exchActions) {
+        assert exchActions != null && 
!exchActions.cacheStopRequests().isEmpty();
 
-            ctx.service().updateUtilityCache();
-        }
+        processCacheStopRequestOnExchangeDone(exchActions);
+    }
 
-        if (err == null) {
-            // Force checkpoint if there is any cache stop request
-            if (exchActions.cacheStopRequests().size() > 0) {
-                try {
-                    sharedCtx.database().waitForCheckpoint("caches stop");
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to wait for checkpoint finish during 
cache stop.", e);
-                }
+    /**
+     * @param exchActions Change requests.
+     */
+    private void processCacheStopRequestOnExchangeDone(ExchangeActions 
exchActions) {
+        // Force checkpoint if there is any cache stop request
+        if (exchActions.cacheStopRequests().size() > 0) {
+            try {
+                sharedCtx.database().waitForCheckpoint("caches stop");
             }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to wait for checkpoint finish during 
cache stop.", e);
+            }
+        }
 
-            for (ExchangeActions.CacheActionData action : 
exchActions.cacheStopRequests()) {
-                CacheGroupContext gctx = 
cacheGrps.get(action.descriptor().groupId());
-
-                // Cancel all operations blocking gateway
-                if (gctx != null) {
-                    final String msg = "Failed to wait for topology update, 
cache group is stopping.";
-
-                    // If snapshot operation in progress we must throw 
CacheStoppedException
-                    // for correct cache proxy restart. For more details see
-                    // IgniteCacheProxy.cacheException()
-                    gctx.affinity().cancelFutures(new 
CacheStoppedException(msg));
-                }
-
-                stopGateway(action.request());
+        for (ExchangeActions.CacheActionData action : 
exchActions.cacheStopRequests()) {
+            CacheGroupContext gctx = 
cacheGrps.get(action.descriptor().groupId());
 
-                sharedCtx.database().checkpointReadLock();
+            // Cancel all operations blocking gateway
+            if (gctx != null) {
+                final String msg = "Failed to wait for topology update, cache 
group is stopping.";
 
-                try {
-                    prepareCacheStop(action.request().cacheName(), 
action.request().destroy());
-                }
-                finally {
-                    sharedCtx.database().checkpointReadUnlock();
-                }
+                // If snapshot operation in progress we must throw 
CacheStoppedException
+                // for correct cache proxy restart. For more details see
+                // IgniteCacheProxy.cacheException()
+                gctx.affinity().cancelFutures(new CacheStoppedException(msg));
             }
 
+            stopGateway(action.request());
+
             sharedCtx.database().checkpointReadLock();
 
             try {
-                // Do not invoke checkpoint listeners for groups are going to 
be destroyed to prevent metadata corruption.
-                for (ExchangeActions.CacheGroupActionData action : 
exchActions.cacheGroupsToStop()) {
-                    Integer groupId = action.descriptor().groupId();
-                    CacheGroupContext grp = cacheGrps.get(groupId);
-
-                    if (grp != null && grp.persistenceEnabled() && 
sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
-                        GridCacheDatabaseSharedManager mngr = 
(GridCacheDatabaseSharedManager) sharedCtx.database();
-                        mngr.removeCheckpointListener((DbCheckpointListener) 
grp.offheap());
-                    }
-                }
+                prepareCacheStop(action.request().cacheName(), 
action.request().destroy());
             }
             finally {
                 sharedCtx.database().checkpointReadUnlock();
             }
+        }
 
-            List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = 
new ArrayList<>();
+        sharedCtx.database().checkpointReadLock();
 
+        try {
+            // Do not invoke checkpoint listeners for groups are going to be 
destroyed to prevent metadata corruption.
             for (ExchangeActions.CacheGroupActionData action : 
exchActions.cacheGroupsToStop()) {
                 Integer groupId = action.descriptor().groupId();
+                CacheGroupContext grp = cacheGrps.get(groupId);
 
-                if (cacheGrps.containsKey(groupId)) {
-                    stoppedGroups.add(F.t(cacheGrps.get(groupId), 
action.destroy()));
-
-                    stopCacheGroup(groupId);
+                if (grp != null && grp.persistenceEnabled() && 
sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
+                    GridCacheDatabaseSharedManager mngr = 
(GridCacheDatabaseSharedManager) sharedCtx.database();
+                    mngr.removeCheckpointListener((DbCheckpointListener) 
grp.offheap());
                 }
             }
+        }
+        finally {
+            sharedCtx.database().checkpointReadUnlock();
+        }
+
+        List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new 
ArrayList<>();
+
+        for (ExchangeActions.CacheGroupActionData action : 
exchActions.cacheGroupsToStop()) {
+            Integer groupId = action.descriptor().groupId();
+
+            if (cacheGrps.containsKey(groupId)) {
+                stoppedGroups.add(F.t(cacheGrps.get(groupId), 
action.destroy()));
+
+                stopCacheGroup(groupId);
+            }
+        }
+
+        if (!sharedCtx.kernalContext().clientNode())
+            sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+
+        if (exchActions.deactivate())
+            sharedCtx.deactivate();
+    }
+
+    /**
+     * Callback invoked when first exchange future for dynamic cache is 
completed.
+     *
+     * @param cacheStartVer Started caches version to create proxy for.
+     * @param exchActions Change requests.
+     * @param err Error.
+     */
+    @SuppressWarnings("unchecked")
+    public void onExchangeDone(
+        AffinityTopologyVersion cacheStartVer,
+        @Nullable ExchangeActions exchActions,
+        @Nullable Throwable err
+    ) {
+        initCacheProxies(cacheStartVer, err);
 
-            if (!sharedCtx.kernalContext().clientNode())
-                sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+        if (exchActions == null)
+            return;
 
-            if (exchActions.deactivate())
-                sharedCtx.deactivate();
+        if (exchActions.systemCachesStarting() && 
exchActions.stateChangeRequest() == null) {
+            ctx.dataStructures().restoreStructuresState(ctx);
+
+            ctx.service().updateUtilityCache();
         }
+
+        if (err == null)
+            processCacheStopRequestOnExchangeDone(exchActions);
     }
 
     /**
@@ -3481,6 +3507,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (msg instanceof DynamicCacheChangeBatch)
             return 
cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
 
+        if (msg instanceof DynamicCacheChangeFailureMessage)
+            
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage) msg, 
topVer);
+
         if (msg instanceof ClientCacheChangeDiscoveryMessage)
             
cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 9f08b43..d44856f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -43,8 +43,8 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
+import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
@@ -79,13 +80,13 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
 import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -117,6 +118,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.getLong;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static 
org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
@@ -147,6 +149,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private static final boolean SKIP_PARTITION_SIZE_VALIDATION = 
Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
 
     /** */
+    private static final String DISTRIBUTED_LATCH_ID = "exchange";
+
+    /** */
     @GridToStringExclude
     private final Object mux = new Object();
 
@@ -253,11 +258,14 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private boolean forceAffReassignment;
 
-    /** Change global state exception. */
-    private Exception changeGlobalStateE;
+    /** Exception that was thrown during init phase on local node. */
+    private Exception exchangeLocE;
+
+    /** Exchange exceptions from all participating nodes. */
+    private final Map<UUID, Exception> exchangeGlobalExceptions = new 
ConcurrentHashMap<>();
 
-    /** Change global state exceptions. */
-    private final Map<UUID, Exception> changeGlobalStateExceptions = new 
ConcurrentHashMap<>();
+    /** Used to track the fact that {@link DynamicCacheChangeFailureMessage} 
was sent. */
+    private volatile boolean cacheChangeFailureMsgSent;
 
     /** */
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new 
ConcurrentHashMap<>();
@@ -508,6 +516,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @return {@code True} if this exchange was triggered by 
DynamicCacheChangeBatch message
+     * in order to start cache(s).
+     */
+    private boolean dynamicCacheStartExchange() {
+        return exchActions != null && 
!exchActions.cacheStartRequests().isEmpty()
+            && exchActions.cacheStopRequests().isEmpty();
+    }
+
+    /**
      * @return {@code True} if activate cluster exchange.
      */
     public boolean activateCluster() {
@@ -773,7 +790,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             if (reconnectOnError(e))
                 onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
             else {
-                U.error(log, "Failed to reinitialize local partitions 
(preloading will be stopped): " + exchId, e);
+                U.error(log, "Failed to reinitialize local partitions 
(rebalancing will be stopped): " + exchId, e);
 
                 onDone(e);
             }
@@ -927,7 +944,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         DiscoveryDataClusterState state = 
cctx.kernalContext().state().clusterState();
 
         if (state.transitionError() != null)
-            changeGlobalStateE = state.transitionError();
+            exchangeLocE = state.transitionError();
 
         if (req.activeChanged()) {
             if (req.activate()) {
@@ -967,11 +984,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         ", client=" + cctx.kernalContext().clientNode() +
                         ", topVer=" + initialVersion() + "]", e);
 
-                    changeGlobalStateE = e;
+                    exchangeLocE = e;
 
                     if (crd) {
                         synchronized (mux) {
-                            
changeGlobalStateExceptions.put(cctx.localNodeId(), e);
+                            exchangeGlobalExceptions.put(cctx.localNodeId(), 
e);
                         }
                     }
                 }
@@ -1002,7 +1019,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         ", client=" + cctx.kernalContext().clientNode() +
                         ", topVer=" + initialVersion() + "]", e);
 
-                    changeGlobalStateE = e;
+                    exchangeLocE = e;
                 }
             }
         }
@@ -1027,7 +1044,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     ", client=" + cctx.kernalContext().clientNode() +
                     ", topVer=" + initialVersion() + "]", e);
 
-                changeGlobalStateE = e;
+                exchangeLocE = e;
             }
         }
 
@@ -1044,7 +1061,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         assert !exchActions.clientOnlyExchange() : exchActions;
 
-        cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+        try {
+            cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
+        }
+        catch (Exception e) {
+            if (reconnectOnError(e) || !isRollbackSupported())
+                // This exception will be handled by init() method.
+                throw e;
+
+            U.error(log, "Failed to initialize cache(s) (will try to 
rollback). " + exchId, e);
+
+            exchangeLocE = new IgniteCheckedException(
+                "Failed to initialize exchange locally [locNodeId=" + 
cctx.localNodeId() + "]", e);
+
+            exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE);
+        }
 
         return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : 
ExchangeType.ALL;
     }
@@ -1294,8 +1325,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void waitPartitionRelease(boolean distributed) throws 
IgniteCheckedException {
         Latch releaseLatch = null;
 
+        // Wait for other nodes only on first phase.
         if (distributed)
-            releaseLatch = cctx.exchange().latch().getOrCreate("exchange", 
initialVersion());
+            releaseLatch = 
cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
 
         IgniteInternalFuture<?> partReleaseFut = 
cctx.partitionReleaseFuture(initialVersion());
 
@@ -1505,9 +1537,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 resetLostPartitions(caches);
         }
 
-        if (cctx.kernalContext().clientNode()) {
+        if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() 
&& exchangeLocE != null)) {
             msg = new GridDhtPartitionsSingleMessage(exchangeId(),
-                true,
+                cctx.kernalContext().clientNode(),
                 cctx.versions().last(),
                 true);
         }
@@ -1524,8 +1556,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 msg.partitionHistoryCounters(partHistReserved0);
         }
 
-        if (stateChangeExchange() && changeGlobalStateE != null)
-            msg.setError(changeGlobalStateE);
+        if ((stateChangeExchange() || dynamicCacheStartExchange()) && 
exchangeLocE != null)
+            msg.setError(exchangeLocE);
         else if (localJoinExchange())
             
msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
 
@@ -1558,8 +1590,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             partHistSuppliers,
             partsToReload);
 
-        if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions))
-            m.setErrorsMap(changeGlobalStateExceptions);
+        if (stateChangeExchange() && !F.isEmpty(exchangeGlobalExceptions))
+            m.setErrorsMap(exchangeGlobalExceptions);
 
         return m;
     }
@@ -1758,7 +1790,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         cctx.kernalContext().authentication().onActivate();
 
         if (exchActions != null && err == null)
-            exchActions.completeRequestFutures(cctx);
+            exchActions.completeRequestFutures(cctx, null);
 
         if (stateChangeExchange() && err == null)
             
cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest());
@@ -1874,15 +1906,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         pendingSingleMsgs.clear();
         fullMsgs.clear();
         msgs.clear();
-        changeGlobalStateExceptions.clear();
         crd = null;
         partReleaseFut = null;
-        changeGlobalStateE = null;
         exchActions = null;
         mergedJoinExchMsgs = null;
         pendingJoinMsg = null;
         exchCtx = null;
         newCrdFut = null;
+        exchangeLocE = null;
+        exchangeGlobalExceptions.clear();
     }
 
     /**
@@ -2075,13 +2107,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     public void forceClientReconnect(ClusterNode node, 
GridDhtPartitionsSingleMessage msg) {
         Exception e = new IgniteNeedReconnectException(node, null);
 
-        changeGlobalStateExceptions.put(node.id(), e);
+        exchangeGlobalExceptions.put(node.id(), e);
 
         onDone(null, e);
 
         GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true, 
false);
 
-        fullMsg.setErrorsMap(changeGlobalStateExceptions);
+        fullMsg.setErrorsMap(exchangeGlobalExceptions);
 
         FinishState finishState0 = new FinishState(cctx.localNodeId(),
             initialVersion(),
@@ -2178,6 +2210,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (cctx.kernalContext().isStopping())
                     return;
 
+                // DynamicCacheChangeFailureMessage was sent.
+                // Thus, there is no need to create and send 
GridDhtPartitionsFullMessage.
+                if (cacheChangeFailureMsgSent)
+                    return;
+
                 FinishState finishState0;
 
                 synchronized (mux) {
@@ -2255,8 +2292,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                         pendingSingleUpdates++;
 
-                        if (stateChangeExchange() && msg.getError() != null)
-                            changeGlobalStateExceptions.put(nodeId, 
msg.getError());
+                        if ((stateChangeExchange() || 
dynamicCacheStartExchange()) && msg.getError() != null)
+                            exchangeGlobalExceptions.put(nodeId, 
msg.getError());
 
                         allReceived = remaining.isEmpty();
 
@@ -2290,7 +2327,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
 
         if (finishState0 != null) {
-            sendAllPartitionsToNode(finishState0, msg, nodeId);
+            // DynamicCacheChangeFailureMessage was sent.
+            // Thus, there is no need to create and send 
GridDhtPartitionsFullMessage.
+            if (!cacheChangeFailureMsgSent)
+                sendAllPartitionsToNode(finishState0, msg, nodeId);
 
             return;
         }
@@ -2590,6 +2630,69 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Creates an IgniteCheckedException that is used as root cause of the 
exchange initialization failure.
+     * This method aggregates all the exceptions provided from all 
participating nodes.
+     *
+     * @param globalExceptions collection exceptions from all participating 
nodes.
+     * @return exception that represents a cause of the exchange 
initialization failure.
+     */
+    private IgniteCheckedException createExchangeException(Map<UUID, 
Exception> globalExceptions) {
+        IgniteCheckedException ex = new IgniteCheckedException("Failed to 
complete exchange process.");
+
+        for (Map.Entry<UUID, Exception> entry : globalExceptions.entrySet())
+            if (ex != entry.getValue())
+                ex.addSuppressed(entry.getValue());
+
+        return ex;
+    }
+
+    /**
+     * @return {@code true} if the given {@code discoEvt} supports the 
rollback procedure.
+     */
+    private boolean isRollbackSupported() {
+        if 
(!firstEvtDiscoCache.checkAttribute(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED,
 Boolean.TRUE))
+            return false;
+
+        // Currently the rollback process is supported for dynamically started 
caches only.
+        return firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && 
dynamicCacheStartExchange();
+    }
+
+    /**
+     * Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes
+     * that represents a cause of exchange failure.
+     */
+    private void sendExchangeFailureMessage() {
+        assert crd != null && crd.isLocal();
+
+        try {
+            IgniteCheckedException err = 
createExchangeException(exchangeGlobalExceptions);
+
+            List<String> cacheNames = new 
ArrayList<>(exchActions.cacheStartRequests().size());
+
+            for (ExchangeActions.CacheActionData actionData : 
exchActions.cacheStartRequests())
+                cacheNames.add(actionData.request().cacheName());
+
+            DynamicCacheChangeFailureMessage msg = new 
DynamicCacheChangeFailureMessage(
+                cctx.localNode(), exchId, err, cacheNames);
+
+            if (log.isDebugEnabled())
+                log.debug("Dynamic cache change failed (send message to all 
participating nodes): " + msg);
+
+            cacheChangeFailureMsgSent = true;
+
+            cctx.discovery().sendCustomEvent(msg);
+
+            return;
+        }
+        catch (IgniteCheckedException  e) {
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else
+                onDone(e);
+        }
+    }
+
+    /**
      * @param sndResNodes Additional nodes to send finish message to.
      */
     private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
@@ -2600,8 +2703,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             if (!exchCtx.mergeExchanges() && 
!crd.equals(events().discoveryCache().serverNodes().get(0))) {
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (!grp.isLocal())
+                    if (grp.isLocal())
+                        continue;
+
+                    // It is possible affinity is not initialized.
+                    // For example, dynamic cache start failed.
+                    if (grp.affinity().lastVersion().topologyVersion() > 0)
                         grp.topology().beforeExchange(this, !centralizedAff && 
!forceAffReassignment, false);
+                    else
+                        assert exchangeLocE != null :
+                            "Affinity is not calculated for the cache group 
[groupName=" + grp.name() + "]";
                 }
             }
 
@@ -2630,6 +2741,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> 
sndResNodes) {
         try {
+            if (!F.isEmpty(exchangeGlobalExceptions) && 
dynamicCacheStartExchange() && isRollbackSupported()) {
+                sendExchangeFailureMessage();
+
+                return;
+            }
+
             AffinityTopologyVersion resTopVer = 
exchCtx.events().topologyVersion();
 
             if (log.isInfoEnabled()) {
@@ -2846,12 +2963,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 boolean stateChangeErr = false;
 
-                if (!F.isEmpty(changeGlobalStateExceptions)) {
+                if (!F.isEmpty(exchangeGlobalExceptions)) {
                     stateChangeErr = true;
 
                     err = new IgniteCheckedException("Cluster state change 
failed.");
 
-                    
cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, 
req);
+                    
cctx.kernalContext().state().onStateChangeError(exchangeGlobalExceptions, req);
                 }
                 else {
                     boolean hasMoving = !partsToReload.isEmpty();
@@ -3128,15 +3245,27 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             try {
                 assert msg.restoreExchangeId() != null : msg;
 
-                GridDhtPartitionsSingleMessage res = 
cctx.exchange().createPartitionsSingleMessage(
-                    msg.restoreExchangeId(),
-                    cctx.kernalContext().clientNode(),
-                    true,
-                    
node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
-                    exchActions);
+                GridDhtPartitionsSingleMessage res;
 
-                if (localJoinExchange() && finishState0 == null)
-                    
res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
+                if (dynamicCacheStartExchange() && exchangeLocE != null) {
+                    res = new 
GridDhtPartitionsSingleMessage(msg.restoreExchangeId(),
+                        cctx.kernalContext().clientNode(),
+                        cctx.versions().last(),
+                        true);
+
+                    res.setError(exchangeLocE);
+                }
+                else {
+                    res = cctx.exchange().createPartitionsSingleMessage(
+                        msg.restoreExchangeId(),
+                        cctx.kernalContext().clientNode(),
+                        true,
+                        
node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
+                        exchActions);
+
+                    if (localJoinExchange() && finishState0 == null)
+                        
res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
+                }
 
                 res.restoreState(true);
 
@@ -3298,6 +3427,21 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             else if (forceAffReassignment)
                 cctx.affinity().applyAffinityFromFullMessage(this, msg);
 
+            if (dynamicCacheStartExchange() && 
!F.isEmpty(exchangeGlobalExceptions)) {
+                assert cctx.localNode().isClient();
+
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-8796
+                // The current exchange has been successfully completed on all 
server nodes,
+                // but has failed on that client node for some reason.
+                // It looks like that we need to rollback dynamically started 
caches on the client node,
+                // complete DynamicCacheStartFutures (if they are registered) 
with the cause of that failure
+                // and complete current exchange without errors.
+
+                onDone(exchangeLocE);
+
+                return;
+            }
+
             updatePartitionFullMap(resTopVer, msg);
 
             IgniteCheckedException err = null;
@@ -3385,6 +3529,57 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Cache change failure message callback, processed from the discovery 
thread.
+     *
+     * @param node Message sender node.
+     * @param msg Failure message.
+     */
+    public void onDynamicCacheChangeFail(final ClusterNode node, final 
DynamicCacheChangeFailureMessage msg) {
+        assert exchId.equals(msg.exchangeId()) : msg;
+        assert firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && 
dynamicCacheStartExchange();
+
+        final ExchangeActions actions = exchangeActions();
+
+        onDiscoveryEvent(new IgniteRunnable() {
+            @Override public void run() {
+                // The rollbackExchange() method has to wait for checkpoint.
+                // That operation is time consumed, and therefore it should be 
executed outside the discovery thread.
+                cctx.kernalContext().getSystemExecutorService().submit(new 
Runnable() {
+                    @Override public void run() {
+                        if (isDone() || !enterBusy())
+                            return;
+
+                        try {
+                            assert msg.error() != null: msg;
+
+                            // Try to revert all the changes that were done 
during initialization phase
+                            
cctx.affinity().forceCloseCaches(GridDhtPartitionsExchangeFuture.this,
+                                crd.isLocal(), msg.exchangeActions());
+
+                            synchronized (mux) {
+                                finishState = new FinishState(crd.id(), 
initialVersion(), null);
+
+                                state = ExchangeLocalState.DONE;
+                            }
+
+                            if (actions != null)
+                                actions.completeRequestFutures(cctx, 
msg.error());
+
+                            onDone(exchId.topologyVersion());
+                        }
+                        catch (Throwable e) {
+                            onDone(e);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        });
+    }
+
+    /**
      * Affinity change message callback, processed from the same thread as 
{@link #onNodeLeft}.
      *
      * @param node Message sender node.
@@ -3597,8 +3792,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         }
 
                         if (crd0.isLocal()) {
-                            if (stateChangeExchange() && changeGlobalStateE != 
null)
-                                changeGlobalStateExceptions.put(crd0.id(), 
changeGlobalStateE);
+                            if (stateChangeExchange() && exchangeLocE != null)
+                                exchangeGlobalExceptions.put(crd0.id(), 
exchangeLocE);
 
                             if (crdChanged) {
                                 if (log.isInfoEnabled()) {
@@ -3773,6 +3968,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     if (!msg.client()) {
                         msgs.put(e.getKey().id(), e.getValue());
 
+                        if (dynamicCacheStartExchange() && msg.getError() != 
null)
+                            exchangeGlobalExceptions.put(e.getKey().id(), 
msg.getError());
+
                         updatePartitionSingleMap(e.getKey().id(), msg);
                     }
                 }
@@ -4041,7 +4239,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         CLIENT,
 
         /**
-         * Previous coordinator failed before echange finished and
+         * Previous coordinator failed before exchange finished and
          * local performs initialization to become new coordinator.
          */
         BECOME_CRD,

Reply via email to