IGNITE-1094 Fixed hanging during cache initialization Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a393e696 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a393e696 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a393e696 Branch: refs/heads/ignite-8783 Commit: a393e696212ef0dd4f23f923bf1001e0a48db915 Parents: 84a7b59 Author: Slava Koptilin <slava.kopti...@gmail.com> Authored: Mon Jul 16 16:40:56 2018 +0300 Committer: Andrey Gura <ag...@apache.org> Committed: Mon Jul 16 16:40:56 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 5 + .../ignite/internal/IgniteNodeAttributes.java | 3 + .../internal/managers/discovery/DiscoCache.java | 20 + .../cache/CacheAffinitySharedManager.java | 148 +++- .../processors/cache/ClusterCachesInfo.java | 25 + .../cache/DynamicCacheChangeFailureMessage.java | 151 ++++ .../processors/cache/ExchangeActions.java | 19 +- .../GridCachePartitionExchangeManager.java | 8 + .../processors/cache/GridCacheProcessor.java | 169 ++-- .../GridDhtPartitionsExchangeFuture.java | 282 ++++++- ...IgniteAbstractDynamicCacheStartFailTest.java | 775 +++++++++++++++++++ ...ynamicCacheStartCoordinatorFailoverTest.java | 262 +++++++ .../cache/IgniteDynamicCacheStartFailTest.java | 46 ++ ...ynamicCacheStartFailWithPersistenceTest.java | 91 +++ .../testsuites/IgniteCacheTestSuite4.java | 6 + ...eQueryAfterDynamicCacheStartFailureTest.java | 69 ++ .../IgniteCacheWithIndexingTestSuite.java | 3 + 17 files changed, 1921 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 4687114..4c8fa9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -239,6 +239,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JIT_NAME; @@ -1674,6 +1675,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (cfg.getConnectorConfiguration() != null) add(ATTR_REST_PORT_RANGE, cfg.getConnectorConfiguration().getPortRange()); + // Whether rollback of dynamic cache start is supported or not. + // This property is added because of backward compatibility. + add(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE); + // Save data storage configuration. addDataStorageConfigurationAttributes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 663a6f9..ed16a77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -202,6 +202,9 @@ public final class IgniteNodeAttributes { /** Rebalance thread pool size. */ public static final String ATTR_REBALANCE_POOL_SIZE = ATTR_PREFIX + ".rebalance.pool.size"; + /** Internal attribute name constant. */ + public static final String ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED = ATTR_PREFIX + ".dynamic.cache.start.rollback.supported"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 73f6d23..8cdcbf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -424,6 +424,26 @@ public class DiscoCache { } /** + * + * Returns {@code True} if all nodes has the given attribute and its value equals to {@code expVal}. + * + * @param <T> Attribute Type. + * @param name Attribute name. + * @param expVal Expected value. + * @return {@code True} if all the given nodes has the given attribute and its value equals to {@code expVal}. + */ + public <T> boolean checkAttribute(String name, T expVal) { + for (ClusterNode node : allNodes) { + T attr = node.attribute(name); + + if (attr == null || !expVal.equals(attr)) + return false; + } + + return true; + } + + /** * @param nodes Cluster nodes. * @return Empty collection if nodes list is {@code null} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 08eb43f..2871e82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -731,6 +731,28 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * Called during the rollback of the exchange partitions procedure + * in order to stop the given cache even if it's not fully initialized (e.g. failed on cache init stage). + * + * @param fut Exchange future. + * @param crd Coordinator flag. + * @param exchActions Cache change requests. + */ + public void forceCloseCaches( + GridDhtPartitionsExchangeFuture fut, + boolean crd, + final ExchangeActions exchActions + ) { + assert exchActions != null && !exchActions.empty() && exchActions.cacheStartRequests().isEmpty(): exchActions; + + caches.updateCachesInfo(exchActions); + + processCacheStopRequests(fut, crd, exchActions, true); + + cctx.cache().forceCloseCaches(exchActions); + } + + /** * Called on exchange initiated for cache start/stop request. * * @param fut Exchange future. @@ -745,13 +767,70 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ) throws IgniteCheckedException { assert exchActions != null && !exchActions.empty() : exchActions; - final ExchangeDiscoveryEvents evts = fut.context().events(); - caches.updateCachesInfo(exchActions); // Affinity did not change for existing caches. onCustomMessageNoAffinityChange(fut, crd, exchActions); + processCacheStartRequests(fut, crd, exchActions); + + Set<Integer> stoppedGrps = processCacheStopRequests(fut, crd, exchActions, false); + + if (stoppedGrps != null) { + AffinityTopologyVersion notifyTopVer = null; + + synchronized (mux) { + if (waitInfo != null) { + for (Integer grpId : stoppedGrps) { + boolean rmv = waitInfo.waitGrps.remove(grpId) != null; + + if (rmv) { + notifyTopVer = waitInfo.topVer; + + waitInfo.assignments.remove(grpId); + } + } + } + } + + if (notifyTopVer != null) { + final AffinityTopologyVersion topVer = notifyTopVer; + + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + onCacheGroupStopped(topVer); + } + }); + } + } + + ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get(); + + if (msg != null) { + msg.checkCachesExist(caches.registeredCaches.keySet()); + + if (msg.empty()) + clientCacheChanges.remove(); + } + } + + /** + * Process cache start requests. + * + * @param fut Exchange future. + * @param crd Coordinator flag. + * @param exchActions Cache change requests. + * @throws IgniteCheckedException If failed. + */ + private void processCacheStartRequests( + GridDhtPartitionsExchangeFuture fut, + boolean crd, + final ExchangeActions exchActions + ) throws IgniteCheckedException { + assert exchActions != null && !exchActions.empty() : exchActions; + + final ExchangeDiscoveryEvents evts = fut.context().events(); + for (ExchangeActions.CacheActionData action : exchActions.cacheStartRequests()) { DynamicCacheDescriptor cacheDesc = action.descriptor(); @@ -830,6 +909,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } } + } + + /** + * Process cache stop requests. + * + * @param fut Exchange future. + * @param crd Coordinator flag. + * @param exchActions Cache change requests. + * @param forceClose + * @return Set of cache groups to be stopped. + */ + private Set<Integer> processCacheStopRequests( + GridDhtPartitionsExchangeFuture fut, + boolean crd, + final ExchangeActions exchActions, + boolean forceClose + ) { + assert exchActions != null && !exchActions.empty() : exchActions; for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) cctx.cache().blockGateway(action.request().cacheName(), true, action.request().restart()); @@ -844,54 +941,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (data.descriptor().config().getCacheMode() != LOCAL) { CacheGroupHolder cacheGrp = grpHolders.remove(data.descriptor().groupId()); - assert cacheGrp != null : data.descriptor(); - - if (stoppedGrps == null) - stoppedGrps = new HashSet<>(); - - stoppedGrps.add(cacheGrp.groupId()); - - cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); - } - } - } - - if (stoppedGrps != null) { - AffinityTopologyVersion notifyTopVer = null; + assert cacheGrp != null || forceClose : data.descriptor(); - synchronized (mux) { - if (waitInfo != null) { - for (Integer grpId : stoppedGrps) { - boolean rmv = waitInfo.waitGrps.remove(grpId) != null; + if (cacheGrp != null) { + if (stoppedGrps == null) + stoppedGrps = new HashSet<>(); - if (rmv) { - notifyTopVer = waitInfo.topVer; + stoppedGrps.add(cacheGrp.groupId()); - waitInfo.assignments.remove(grpId); - } + cctx.io().removeHandler(true, cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); } } } - - if (notifyTopVer != null) { - final AffinityTopologyVersion topVer = notifyTopVer; - - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - onCacheGroupStopped(topVer); - } - }); - } } - ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get(); - - if (msg != null) { - msg.checkCachesExist(caches.registeredCaches.keySet()); - - if (msg.empty()) - clientCacheChanges.remove(); - } + return stoppedGrps; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 94f8a27..3aaf7f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -396,6 +396,31 @@ class ClusterCachesInfo { } /** + * Creates exchanges actions. Forms a list of caches and cache groups to be stopped + * due to dynamic cache start failure. + * + * @param failMsg Dynamic change request fail message. + * @param topVer Topology version. + */ + public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) { + ExchangeActions exchangeActions = new ExchangeActions(); + + List<DynamicCacheChangeRequest> requests = new ArrayList<>(failMsg.cacheNames().size()); + + for (String cacheName : failMsg.cacheNames()) { + DynamicCacheDescriptor cacheDescr = registeredCaches.get(cacheName); + + assert cacheDescr != null : "Dynamic cache descriptor is missing [cacheName=" + cacheName + "]"; + + requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true)); + } + + processCacheChangeRequests(exchangeActions, requests, topVer,false); + + failMsg.exchangeActions(exchangeActions); + } + + /** * @param batch Cache change request. * @param topVer Topology version. * @return {@code True} if minor topology version should be increased. http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java new file mode 100644 index 0000000..d0cb08d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeFailureMessage.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * This class represents discovery message that is used to provide information about dynamic cache start failure. + */ +public class DynamicCacheChangeFailureMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache names. */ + @GridToStringInclude + private Collection<String> cacheNames; + + /** Custom message ID. */ + private IgniteUuid id; + + /** */ + private GridDhtPartitionExchangeId exchId; + + /** */ + @GridToStringInclude + private IgniteCheckedException cause; + + /** Cache updates to be executed on exchange. */ + private transient ExchangeActions exchangeActions; + + /** + * Creates new DynamicCacheChangeFailureMessage instance. + * + * @param locNode Local node. + * @param exchId Exchange Id. + * @param cause Cache start error. + * @param cacheNames Cache names. + */ + public DynamicCacheChangeFailureMessage( + ClusterNode locNode, + GridDhtPartitionExchangeId exchId, + IgniteCheckedException cause, + Collection<String> cacheNames) + { + assert exchId != null; + assert cause != null; + assert !F.isEmpty(cacheNames) : cacheNames; + + this.id = IgniteUuid.fromUuid(locNode.id()); + this.exchId = exchId; + this.cause = cause; + this.cacheNames = cacheNames; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** + * @return Collection of failed caches. + */ + public Collection<String> cacheNames() { + return cacheNames; + } + + /** + * @return Cache start error. + */ + public IgniteCheckedException error() { + return cause; + } + + /** + * @return Cache updates to be executed on exchange. + */ + public ExchangeActions exchangeActions() { + return exchangeActions; + } + + /** + * @param exchangeActions Cache updates to be executed on exchange. + */ + public void exchangeActions(ExchangeActions exchangeActions) { + assert exchangeActions != null && !exchangeActions.empty() : exchangeActions; + + this.exchangeActions = exchangeActions; + } + + /** + * @return Exchange version. + */ + @Nullable public GridDhtPartitionExchangeId exchangeId() { + return exchId; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache( + GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, + DiscoCache discoCache) { + return mgr.createDiscoCacheOnCacheChange(topVer, discoCache); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DynamicCacheChangeFailureMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index c289b6e..6431d0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -99,17 +99,18 @@ public class ExchangeActions { /** * @return Stop cache requests. */ - Collection<CacheActionData> cacheStopRequests() { + public Collection<CacheActionData> cacheStopRequests() { return cachesToStop != null ? cachesToStop.values() : Collections.<CacheActionData>emptyList(); } /** * @param ctx Context. + * @param err Error if any. */ - public void completeRequestFutures(GridCacheSharedContext ctx) { - completeRequestFutures(cachesToStart, ctx); - completeRequestFutures(cachesToStop, ctx); - completeRequestFutures(cachesToResetLostParts, ctx); + public void completeRequestFutures(GridCacheSharedContext ctx, Throwable err) { + completeRequestFutures(cachesToStart, ctx, err); + completeRequestFutures(cachesToStop, ctx, err); + completeRequestFutures(cachesToResetLostParts, ctx, err); } /** @@ -130,10 +131,14 @@ public class ExchangeActions { * @param map Actions map. * @param ctx Context. */ - private void completeRequestFutures(Map<String, CacheActionData> map, GridCacheSharedContext ctx) { + private void completeRequestFutures( + Map<String, CacheActionData> map, + GridCacheSharedContext ctx, + @Nullable Throwable err + ) { if (map != null) { for (CacheActionData req : map.values()) - ctx.cache().completeCacheStartFuture(req.req, true, null); + ctx.cache().completeCacheStartFuture(req.req, (err == null), err); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2bdda19..d3fddab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -494,6 +494,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchangeFuture(msg.exchangeId(), null, null, null, null) .onAffinityChangeMessage(evt.eventNode(), msg); } + else if (customMsg instanceof DynamicCacheChangeFailureMessage) { + DynamicCacheChangeFailureMessage msg = (DynamicCacheChangeFailureMessage) customMsg; + + if (msg.exchangeId().topologyVersion().topologyVersion() >= + affinityTopologyVersion(cctx.discovery().localJoinEvent()).topologyVersion()) + exchangeFuture(msg.exchangeId(), null, null, null, null) + .onDynamicCacheChangeFail(evt.eventNode(), msg); + } else if (customMsg instanceof SnapshotDiscoveryMessage && ((SnapshotDiscoveryMessage) customMsg).needExchange()) { exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 26f1887..ae4fee4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2076,7 +2076,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } else proxy.closeProxy(); - } } @@ -2116,6 +2115,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Stopped cache context. */ private GridCacheContext<?, ?> prepareCacheStop(String cacheName, boolean destroy) { + assert sharedCtx.database().checkpointLockIsHeldByThread(); + GridCacheAdapter<?, ?> cache = caches.remove(cacheName); if (cache != null) { @@ -2221,7 +2222,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { cctx.gate().onStopped(); - prepareCacheStop(cctx.name(), destroy); + sharedCtx.database().checkpointReadLock(); + + try { + prepareCacheStop(cctx.name(), destroy); + } + finally { + sharedCtx.database().checkpointReadUnlock(); + } if (!cctx.group().hasCaches()) stopCacheGroup(cctx.group().groupId()); @@ -2229,101 +2237,119 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Callback invoked when first exchange future for dynamic cache is completed. + * Called during the rollback of the exchange partitions procedure + * in order to stop the given cache even if it's not fully initialized (e.g. failed on cache init stage). * - * @param cacheStartVer Started caches version to create proxy for. - * @param exchActions Change requests. - * @param err Error. + * @param exchActions Stop requests. */ - @SuppressWarnings("unchecked") - public void onExchangeDone( - AffinityTopologyVersion cacheStartVer, - @Nullable ExchangeActions exchActions, - @Nullable Throwable err - ) { - initCacheProxies(cacheStartVer, err); - - if (exchActions == null) - return; - - if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) { - ctx.dataStructures().restoreStructuresState(ctx); + void forceCloseCaches(ExchangeActions exchActions) { + assert exchActions != null && !exchActions.cacheStopRequests().isEmpty(); - ctx.service().updateUtilityCache(); - } + processCacheStopRequestOnExchangeDone(exchActions); + } - if (err == null) { - // Force checkpoint if there is any cache stop request - if (exchActions.cacheStopRequests().size() > 0) { - try { - sharedCtx.database().waitForCheckpoint("caches stop"); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to wait for checkpoint finish during cache stop.", e); - } + /** + * @param exchActions Change requests. + */ + private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) { + // Force checkpoint if there is any cache stop request + if (exchActions.cacheStopRequests().size() > 0) { + try { + sharedCtx.database().waitForCheckpoint("caches stop"); } + catch (IgniteCheckedException e) { + U.error(log, "Failed to wait for checkpoint finish during cache stop.", e); + } + } - for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) { - CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId()); - - // Cancel all operations blocking gateway - if (gctx != null) { - final String msg = "Failed to wait for topology update, cache group is stopping."; - - // If snapshot operation in progress we must throw CacheStoppedException - // for correct cache proxy restart. For more details see - // IgniteCacheProxy.cacheException() - gctx.affinity().cancelFutures(new CacheStoppedException(msg)); - } - - stopGateway(action.request()); + for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) { + CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId()); - sharedCtx.database().checkpointReadLock(); + // Cancel all operations blocking gateway + if (gctx != null) { + final String msg = "Failed to wait for topology update, cache group is stopping."; - try { - prepareCacheStop(action.request().cacheName(), action.request().destroy()); - } - finally { - sharedCtx.database().checkpointReadUnlock(); - } + // If snapshot operation in progress we must throw CacheStoppedException + // for correct cache proxy restart. For more details see + // IgniteCacheProxy.cacheException() + gctx.affinity().cancelFutures(new CacheStoppedException(msg)); } + stopGateway(action.request()); + sharedCtx.database().checkpointReadLock(); try { - // Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption. - for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { - Integer groupId = action.descriptor().groupId(); - CacheGroupContext grp = cacheGrps.get(groupId); - - if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) { - GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager) sharedCtx.database(); - mngr.removeCheckpointListener((DbCheckpointListener) grp.offheap()); - } - } + prepareCacheStop(action.request().cacheName(), action.request().destroy()); } finally { sharedCtx.database().checkpointReadUnlock(); } + } - List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>(); + sharedCtx.database().checkpointReadLock(); + try { + // Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption. for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { Integer groupId = action.descriptor().groupId(); + CacheGroupContext grp = cacheGrps.get(groupId); - if (cacheGrps.containsKey(groupId)) { - stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy())); - - stopCacheGroup(groupId); + if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) { + GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager) sharedCtx.database(); + mngr.removeCheckpointListener((DbCheckpointListener) grp.offheap()); } } + } + finally { + sharedCtx.database().checkpointReadUnlock(); + } + + List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>(); + + for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { + Integer groupId = action.descriptor().groupId(); + + if (cacheGrps.containsKey(groupId)) { + stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy())); + + stopCacheGroup(groupId); + } + } + + if (!sharedCtx.kernalContext().clientNode()) + sharedCtx.database().onCacheGroupsStopped(stoppedGroups); + + if (exchActions.deactivate()) + sharedCtx.deactivate(); + } + + /** + * Callback invoked when first exchange future for dynamic cache is completed. + * + * @param cacheStartVer Started caches version to create proxy for. + * @param exchActions Change requests. + * @param err Error. + */ + @SuppressWarnings("unchecked") + public void onExchangeDone( + AffinityTopologyVersion cacheStartVer, + @Nullable ExchangeActions exchActions, + @Nullable Throwable err + ) { + initCacheProxies(cacheStartVer, err); - if (!sharedCtx.kernalContext().clientNode()) - sharedCtx.database().onCacheGroupsStopped(stoppedGroups); + if (exchActions == null) + return; - if (exchActions.deactivate()) - sharedCtx.deactivate(); + if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) { + ctx.dataStructures().restoreStructuresState(ctx); + + ctx.service().updateUtilityCache(); } + + if (err == null) + processCacheStopRequestOnExchangeDone(exchActions); } /** @@ -3481,6 +3507,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (msg instanceof DynamicCacheChangeBatch) return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + if (msg instanceof DynamicCacheChangeFailureMessage) + cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage) msg, topVer); + if (msg instanceof ClientCacheChangeDiscoveryMessage) cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node); http://git-wip-us.apache.org/repos/asf/ignite/blob/a393e696/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9f08b43..d44856f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -43,8 +43,8 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; @@ -79,13 +80,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -117,6 +118,7 @@ import static org.apache.ignite.IgniteSystemProperties.getLong; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent; @@ -147,6 +149,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION); /** */ + private static final String DISTRIBUTED_LATCH_ID = "exchange"; + + /** */ @GridToStringExclude private final Object mux = new Object(); @@ -253,11 +258,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private boolean forceAffReassignment; - /** Change global state exception. */ - private Exception changeGlobalStateE; + /** Exception that was thrown during init phase on local node. */ + private Exception exchangeLocE; + + /** Exchange exceptions from all participating nodes. */ + private final Map<UUID, Exception> exchangeGlobalExceptions = new ConcurrentHashMap<>(); - /** Change global state exceptions. */ - private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap<>(); + /** Used to track the fact that {@link DynamicCacheChangeFailureMessage} was sent. */ + private volatile boolean cacheChangeFailureMsgSent; /** */ private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap<>(); @@ -508,6 +516,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @return {@code True} if this exchange was triggered by DynamicCacheChangeBatch message + * in order to start cache(s). + */ + private boolean dynamicCacheStartExchange() { + return exchActions != null && !exchActions.cacheStartRequests().isEmpty() + && exchActions.cacheStopRequests().isEmpty(); + } + + /** * @return {@code True} if activate cluster exchange. */ public boolean activateCluster() { @@ -773,7 +790,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (reconnectOnError(e)) onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); else { - U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); + U.error(log, "Failed to reinitialize local partitions (rebalancing will be stopped): " + exchId, e); onDone(e); } @@ -927,7 +944,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState(); if (state.transitionError() != null) - changeGlobalStateE = state.transitionError(); + exchangeLocE = state.transitionError(); if (req.activeChanged()) { if (req.activate()) { @@ -967,11 +984,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", client=" + cctx.kernalContext().clientNode() + ", topVer=" + initialVersion() + "]", e); - changeGlobalStateE = e; + exchangeLocE = e; if (crd) { synchronized (mux) { - changeGlobalStateExceptions.put(cctx.localNodeId(), e); + exchangeGlobalExceptions.put(cctx.localNodeId(), e); } } } @@ -1002,7 +1019,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", client=" + cctx.kernalContext().clientNode() + ", topVer=" + initialVersion() + "]", e); - changeGlobalStateE = e; + exchangeLocE = e; } } } @@ -1027,7 +1044,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", client=" + cctx.kernalContext().clientNode() + ", topVer=" + initialVersion() + "]", e); - changeGlobalStateE = e; + exchangeLocE = e; } } @@ -1044,7 +1061,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert !exchActions.clientOnlyExchange() : exchActions; - cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + try { + cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + } + catch (Exception e) { + if (reconnectOnError(e) || !isRollbackSupported()) + // This exception will be handled by init() method. + throw e; + + U.error(log, "Failed to initialize cache(s) (will try to rollback). " + exchId, e); + + exchangeLocE = new IgniteCheckedException( + "Failed to initialize exchange locally [locNodeId=" + cctx.localNodeId() + "]", e); + + exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE); + } return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; } @@ -1294,8 +1325,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void waitPartitionRelease(boolean distributed) throws IgniteCheckedException { Latch releaseLatch = null; + // Wait for other nodes only on first phase. if (distributed) - releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion()); + releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion()); IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion()); @@ -1505,9 +1537,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte resetLostPartitions(caches); } - if (cctx.kernalContext().clientNode()) { + if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) { msg = new GridDhtPartitionsSingleMessage(exchangeId(), - true, + cctx.kernalContext().clientNode(), cctx.versions().last(), true); } @@ -1524,8 +1556,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.partitionHistoryCounters(partHistReserved0); } - if (stateChangeExchange() && changeGlobalStateE != null) - msg.setError(changeGlobalStateE); + if ((stateChangeExchange() || dynamicCacheStartExchange()) && exchangeLocE != null) + msg.setError(exchangeLocE); else if (localJoinExchange()) msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); @@ -1558,8 +1590,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partHistSuppliers, partsToReload); - if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions)) - m.setErrorsMap(changeGlobalStateExceptions); + if (stateChangeExchange() && !F.isEmpty(exchangeGlobalExceptions)) + m.setErrorsMap(exchangeGlobalExceptions); return m; } @@ -1758,7 +1790,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.kernalContext().authentication().onActivate(); if (exchActions != null && err == null) - exchActions.completeRequestFutures(cctx); + exchActions.completeRequestFutures(cctx, null); if (stateChangeExchange() && err == null) cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest()); @@ -1874,15 +1906,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte pendingSingleMsgs.clear(); fullMsgs.clear(); msgs.clear(); - changeGlobalStateExceptions.clear(); crd = null; partReleaseFut = null; - changeGlobalStateE = null; exchActions = null; mergedJoinExchMsgs = null; pendingJoinMsg = null; exchCtx = null; newCrdFut = null; + exchangeLocE = null; + exchangeGlobalExceptions.clear(); } /** @@ -2075,13 +2107,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public void forceClientReconnect(ClusterNode node, GridDhtPartitionsSingleMessage msg) { Exception e = new IgniteNeedReconnectException(node, null); - changeGlobalStateExceptions.put(node.id(), e); + exchangeGlobalExceptions.put(node.id(), e); onDone(null, e); GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true, false); - fullMsg.setErrorsMap(changeGlobalStateExceptions); + fullMsg.setErrorsMap(exchangeGlobalExceptions); FinishState finishState0 = new FinishState(cctx.localNodeId(), initialVersion(), @@ -2178,6 +2210,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (cctx.kernalContext().isStopping()) return; + // DynamicCacheChangeFailureMessage was sent. + // Thus, there is no need to create and send GridDhtPartitionsFullMessage. + if (cacheChangeFailureMsgSent) + return; + FinishState finishState0; synchronized (mux) { @@ -2255,8 +2292,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte pendingSingleUpdates++; - if (stateChangeExchange() && msg.getError() != null) - changeGlobalStateExceptions.put(nodeId, msg.getError()); + if ((stateChangeExchange() || dynamicCacheStartExchange()) && msg.getError() != null) + exchangeGlobalExceptions.put(nodeId, msg.getError()); allReceived = remaining.isEmpty(); @@ -2290,7 +2327,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (finishState0 != null) { - sendAllPartitionsToNode(finishState0, msg, nodeId); + // DynamicCacheChangeFailureMessage was sent. + // Thus, there is no need to create and send GridDhtPartitionsFullMessage. + if (!cacheChangeFailureMsgSent) + sendAllPartitionsToNode(finishState0, msg, nodeId); return; } @@ -2590,6 +2630,69 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure. + * This method aggregates all the exceptions provided from all participating nodes. + * + * @param globalExceptions collection exceptions from all participating nodes. + * @return exception that represents a cause of the exchange initialization failure. + */ + private IgniteCheckedException createExchangeException(Map<UUID, Exception> globalExceptions) { + IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process."); + + for (Map.Entry<UUID, Exception> entry : globalExceptions.entrySet()) + if (ex != entry.getValue()) + ex.addSuppressed(entry.getValue()); + + return ex; + } + + /** + * @return {@code true} if the given {@code discoEvt} supports the rollback procedure. + */ + private boolean isRollbackSupported() { + if (!firstEvtDiscoCache.checkAttribute(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE)) + return false; + + // Currently the rollback process is supported for dynamically started caches only. + return firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange(); + } + + /** + * Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes + * that represents a cause of exchange failure. + */ + private void sendExchangeFailureMessage() { + assert crd != null && crd.isLocal(); + + try { + IgniteCheckedException err = createExchangeException(exchangeGlobalExceptions); + + List<String> cacheNames = new ArrayList<>(exchActions.cacheStartRequests().size()); + + for (ExchangeActions.CacheActionData actionData : exchActions.cacheStartRequests()) + cacheNames.add(actionData.request().cacheName()); + + DynamicCacheChangeFailureMessage msg = new DynamicCacheChangeFailureMessage( + cctx.localNode(), exchId, err, cacheNames); + + if (log.isDebugEnabled()) + log.debug("Dynamic cache change failed (send message to all participating nodes): " + msg); + + cacheChangeFailureMsgSent = true; + + cctx.discovery().sendCustomEvent(msg); + + return; + } + catch (IgniteCheckedException e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); + } + } + + /** * @param sndResNodes Additional nodes to send finish message to. */ private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) { @@ -2600,8 +2703,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (!grp.isLocal()) + if (grp.isLocal()) + continue; + + // It is possible affinity is not initialized. + // For example, dynamic cache start failed. + if (grp.affinity().lastVersion().topologyVersion() > 0) grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false); + else + assert exchangeLocE != null : + "Affinity is not calculated for the cache group [groupName=" + grp.name() + "]"; } } @@ -2630,6 +2741,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) { try { + if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange() && isRollbackSupported()) { + sendExchangeFailureMessage(); + + return; + } + AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion(); if (log.isInfoEnabled()) { @@ -2846,12 +2963,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean stateChangeErr = false; - if (!F.isEmpty(changeGlobalStateExceptions)) { + if (!F.isEmpty(exchangeGlobalExceptions)) { stateChangeErr = true; err = new IgniteCheckedException("Cluster state change failed."); - cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); + cctx.kernalContext().state().onStateChangeError(exchangeGlobalExceptions, req); } else { boolean hasMoving = !partsToReload.isEmpty(); @@ -3128,15 +3245,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { assert msg.restoreExchangeId() != null : msg; - GridDhtPartitionsSingleMessage res = cctx.exchange().createPartitionsSingleMessage( - msg.restoreExchangeId(), - cctx.kernalContext().clientNode(), - true, - node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0, - exchActions); + GridDhtPartitionsSingleMessage res; - if (localJoinExchange() && finishState0 == null) - res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + if (dynamicCacheStartExchange() && exchangeLocE != null) { + res = new GridDhtPartitionsSingleMessage(msg.restoreExchangeId(), + cctx.kernalContext().clientNode(), + cctx.versions().last(), + true); + + res.setError(exchangeLocE); + } + else { + res = cctx.exchange().createPartitionsSingleMessage( + msg.restoreExchangeId(), + cctx.kernalContext().clientNode(), + true, + node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0, + exchActions); + + if (localJoinExchange() && finishState0 == null) + res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); + } res.restoreState(true); @@ -3298,6 +3427,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (forceAffReassignment) cctx.affinity().applyAffinityFromFullMessage(this, msg); + if (dynamicCacheStartExchange() && !F.isEmpty(exchangeGlobalExceptions)) { + assert cctx.localNode().isClient(); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-8796 + // The current exchange has been successfully completed on all server nodes, + // but has failed on that client node for some reason. + // It looks like that we need to rollback dynamically started caches on the client node, + // complete DynamicCacheStartFutures (if they are registered) with the cause of that failure + // and complete current exchange without errors. + + onDone(exchangeLocE); + + return; + } + updatePartitionFullMap(resTopVer, msg); IgniteCheckedException err = null; @@ -3385,6 +3529,57 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Cache change failure message callback, processed from the discovery thread. + * + * @param node Message sender node. + * @param msg Failure message. + */ + public void onDynamicCacheChangeFail(final ClusterNode node, final DynamicCacheChangeFailureMessage msg) { + assert exchId.equals(msg.exchangeId()) : msg; + assert firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange(); + + final ExchangeActions actions = exchangeActions(); + + onDiscoveryEvent(new IgniteRunnable() { + @Override public void run() { + // The rollbackExchange() method has to wait for checkpoint. + // That operation is time consumed, and therefore it should be executed outside the discovery thread. + cctx.kernalContext().getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + if (isDone() || !enterBusy()) + return; + + try { + assert msg.error() != null: msg; + + // Try to revert all the changes that were done during initialization phase + cctx.affinity().forceCloseCaches(GridDhtPartitionsExchangeFuture.this, + crd.isLocal(), msg.exchangeActions()); + + synchronized (mux) { + finishState = new FinishState(crd.id(), initialVersion(), null); + + state = ExchangeLocalState.DONE; + } + + if (actions != null) + actions.completeRequestFutures(cctx, msg.error()); + + onDone(exchId.topologyVersion()); + } + catch (Throwable e) { + onDone(e); + } + finally { + leaveBusy(); + } + } + }); + } + }); + } + + /** * Affinity change message callback, processed from the same thread as {@link #onNodeLeft}. * * @param node Message sender node. @@ -3597,8 +3792,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (crd0.isLocal()) { - if (stateChangeExchange() && changeGlobalStateE != null) - changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); + if (stateChangeExchange() && exchangeLocE != null) + exchangeGlobalExceptions.put(crd0.id(), exchangeLocE); if (crdChanged) { if (log.isInfoEnabled()) { @@ -3773,6 +3968,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!msg.client()) { msgs.put(e.getKey().id(), e.getValue()); + if (dynamicCacheStartExchange() && msg.getError() != null) + exchangeGlobalExceptions.put(e.getKey().id(), msg.getError()); + updatePartitionSingleMap(e.getKey().id(), msg); } } @@ -4041,7 +4239,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CLIENT, /** - * Previous coordinator failed before echange finished and + * Previous coordinator failed before exchange finished and * local performs initialization to become new coordinator. */ BECOME_CRD,