http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index d57c720..8cea13f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -18,49 +18,34 @@ package org.apache.ignite.internal.processors.cluster; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; -import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; -import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheClientReconnectDiscoveryData; -import org.apache.ignite.internal.processors.cache.CacheData; -import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData; -import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData.CacheInfo; -import org.apache.ignite.internal.processors.cache.CacheNodeCommonDiscoveryData; -import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage; -import org.apache.ignite.internal.processors.cache.ClusterState; -import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse; +import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.StoredCacheData; -import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -72,34 +57,27 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.DiscoveryDataBag; -import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.STATE_PROC; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; -import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE; -import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE; -import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION; -import static org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest.stopRequest; /** * */ public class GridClusterStateProcessor extends GridProcessorAdapter { - /** Global status. */ - private volatile ClusterState globalState; - - /** Action context. */ - private volatile ChangeGlobalStateContext lastCgsCtx; + /** */ + private volatile DiscoveryDataClusterState globalState; /** Local action future. */ - private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>(); + private final AtomicReference<GridChangeGlobalStateFuture> stateChangeFut = new AtomicReference<>(); + + /** Future initialized if node joins when cluster state change is in progress. */ + private TransitionOnJoinWaitFuture joinFut; /** Process. */ @GridToStringExclude @@ -109,12 +87,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { @GridToStringExclude private GridCacheSharedContext<?, ?> sharedCtx; - /** */ - private final ConcurrentHashMap<String, CacheInfo> cacheData = new ConcurrentHashMap<>(); - - /** */ - private volatile CacheJoinNodeDiscoveryData localCacheData; - /** Listener. */ private final GridLocalEventListener lsr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -124,14 +96,15 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this; - final GridChangeGlobalStateFuture f = cgsLocFut.get(); + final GridChangeGlobalStateFuture f = stateChangeFut.get(); - if (f != null) + if (f != null) { f.initFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - f.onDiscoveryEvent(e); + f.onNodeLeft(e); } }); + } } }; @@ -142,531 +115,417 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { super(ctx); } - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - // Start first node as inactive if persistent enable. - globalState = ctx.config().isPersistentStoreEnabled() ? INACTIVE : - ctx.config().isActiveOnStart() ? ACTIVE : INACTIVE; - - ctx.discovery().setCustomEventListener( - ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() { - @Override public void onCustomEvent( - AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) { - assert topVer != null; - assert snd != null; - assert msg != null; - - boolean activate = msg.activate(); - - ChangeGlobalStateContext actx = lastCgsCtx; - - if (actx != null && globalState == TRANSITION) { - GridChangeGlobalStateFuture f = cgsLocFut.get(); - - if (log.isDebugEnabled()) - log.debug("Concurrent " + prettyStr(activate) + " [id=" + - ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]"); - - if (f != null && f.requestId.equals(msg.requestId())) - f.onDone(new IgniteCheckedException( - "Concurrent change state, now in progress=" + (activate) - + ", initiatingNodeId=" + actx.initiatingNodeId - + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId() - )); - - msg.concurrentChangeState(); - } - else { - if (log.isInfoEnabled()) - log.info("Create " + prettyStr(activate) + " context [id=" + - ctx.localNodeId() + " topVer=" + topVer + ", reqId=" + - msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]"); - - lastCgsCtx = new ChangeGlobalStateContext( - msg.requestId(), - msg.initiatorNodeId(), - msg.getDynamicCacheChangeBatch(), - msg.activate()); - - globalState = TRANSITION; - } - } - }); - - ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); - } - /** - * @param data Joining node discovery data. + * @return Cluster state to be used on public API. */ - public void cacheProcessorStarted(CacheJoinNodeDiscoveryData data) { - assert data != null; + public boolean publicApiActiveState() { + DiscoveryDataClusterState globalState = this.globalState; - localCacheData = data; + assert globalState != null; - cacheProc = ctx.cache(); - sharedCtx = cacheProc.context(); + if (globalState.transition()) { + Boolean transitionRes = globalState.transitionResult(); - sharedCtx.io().addCacheHandler( - 0, GridChangeGlobalStateMessageResponse.class, - new CI2<UUID, GridChangeGlobalStateMessageResponse>() { - @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { - processChangeGlobalStateResponse(nodeId, msg); - } - }); + if (transitionRes != null) + return transitionRes; + else + return false; + } + else + return globalState.active(); } /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); - - sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class); - ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); - - IgniteCheckedException stopErr = new IgniteInterruptedCheckedException( - "Node is stopping: " + ctx.igniteInstanceName()); - - GridChangeGlobalStateFuture f = cgsLocFut.get(); + @Override public void start() throws IgniteCheckedException { + // Start first node as inactive if persistence is enabled. + boolean activeOnStart = !ctx.config().isPersistentStoreEnabled() && ctx.config().isActiveOnStart(); - if (f != null) - f.onDone(stopErr); + globalState = DiscoveryDataClusterState.createState(activeOnStart); - cgsLocFut.set(null); + ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); + @Override public void onKernalStop(boolean cancel) { + GridChangeGlobalStateFuture fut = this.stateChangeFut.get(); - if (ctx.isDaemon()) - return; + if (fut != null) + fut.onDone(new IgniteCheckedException("Failed to wait for cluster state change, node is stopping.")); - List<ClusterNode> nodes = ctx.discovery().serverNodes(AffinityTopologyVersion.NONE); - - assert localCacheData != null; - - // First node started (coordinator). - if (nodes.isEmpty() || nodes.get(0).isLocal()) - cacheData.putAll(localCacheData.caches()); - - if (globalState == INACTIVE) { // Accept inactivate state after join. - if (log != null && log.isInfoEnabled()) - log.info("Got inactivate state from cluster during node join."); - - // Revert start action if get INACTIVE state on join. - sharedCtx.snapshot().onDeActivate(ctx); - - if (sharedCtx.pageStore() != null) - sharedCtx.pageStore().onDeActivate(ctx); - - if (sharedCtx.wal() != null) - sharedCtx.wal().onDeActivate(ctx); - - sharedCtx.database().onDeActivate(ctx); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { - return DiscoveryDataExchangeType.STATE_PROC; + super.onKernalStop(cancel); } - /** {@inheritDoc} */ - @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal())) - dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState); - } + /** + * @param discoCache Discovery data cache. + * @return If transition is in progress returns future which is completed when transition finishes. + */ + @Nullable public IgniteInternalFuture<Boolean> onLocalJoin(DiscoCache discoCache) { + if (globalState.transition()) { + joinFut = new TransitionOnJoinWaitFuture(globalState, discoCache); - /** {@inheritDoc} */ - @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - ClusterState state = (ClusterState)data.commonData(); + return joinFut; + } - if (state != null) - globalState = state; + return null; } /** - * + * @param node Failed node. + * @return Message if cluster state changed. */ - public IgniteInternalFuture<?> changeGlobalState(final boolean activate) { - if (ctx.isDaemon()) { - GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); + @Nullable public ChangeGlobalStateFinishMessage onNodeLeft(ClusterNode node) { + if (globalState.transition()) { + Set<UUID> nodes = globalState.transitionNodes(); - sendCompute(activate, fut); + if (nodes.remove(node.id()) && nodes.isEmpty()) { + U.warn(log, "Failed to change cluster state, all participating nodes failed. " + + "Switching to inactive state."); - return fut; - } + ChangeGlobalStateFinishMessage msg = + new ChangeGlobalStateFinishMessage(globalState.transitionRequestId(), false); - if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) - throw new IgniteException("Failed to " + prettyStr(activate) + " cluster (must invoke the " + - "method outside of an active transaction)."); + onStateFinishMessage(msg); - if ((globalState == ACTIVE && activate) || (globalState == INACTIVE && !activate)) - return new GridFinishedFuture<>(); + return msg; + } + } - final UUID requestId = UUID.randomUUID(); + return null; + } - final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx); + /** + * @param msg Message. + */ + public void onStateFinishMessage(ChangeGlobalStateFinishMessage msg) { + if (msg.requestId().equals(globalState.transitionRequestId())) { + log.info("Received state change finish message: " + msg.clusterActive()); - if (!cgsLocFut.compareAndSet(null, cgsFut)) { - GridChangeGlobalStateFuture locF = cgsLocFut.get(); + globalState = DiscoveryDataClusterState.createState(msg.clusterActive()); - if (locF.activate == activate) - return locF; + ctx.cache().onStateChangeFinish(msg); - return new GridFinishedFuture<>(new IgniteException( - "Failed to " + prettyStr(activate) + ", because another state change operation is currently " + - "in progress: " + prettyStr(locF.activate))); - } + TransitionOnJoinWaitFuture joinFut = this.joinFut; - if (globalState == ACTIVE && !activate && ctx.cache().context().snapshot().snapshotOperationInProgress()){ - return new GridFinishedFuture<>(new IgniteException( - "Failed to " + prettyStr(activate) + ", because snapshot operation in progress.")); + if (joinFut != null) + joinFut.onDone(false); } + else + U.warn(log, "Received state finish message with unexpected ID: " + msg); + } - if (ctx.clientNode()) - sendCompute(activate, cgsFut); - else { - try { - List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); - - DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest( - requestId, activate ? ACTIVE : INACTIVE, ctx.localNodeId()); - - reqs.add(changeGlobalStateReq); - - List<DynamicCacheChangeRequest> cacheReqs = activate ? startAllCachesRequests() : stopAllCachesRequests(); + /** + * @param topVer Current topology version. + * @param msg Message. + * @param discoCache Current nodes. + * @return {@code True} if need start state change process. + */ + public boolean onStateChangeMessage(AffinityTopologyVersion topVer, + ChangeGlobalStateMessage msg, + DiscoCache discoCache) { + if (globalState.transition()) { + if (globalState.active() != msg.activate()) { + GridChangeGlobalStateFuture fut = changeStateFuture(msg); + + if (fut != null) + fut.onDone(concurrentStateChangeError(msg.activate())); + } + else { + final GridChangeGlobalStateFuture stateFut = changeStateFuture(msg); - reqs.addAll(cacheReqs); + if (stateFut != null) { + IgniteInternalFuture<?> exchFut = ctx.cache().context().exchange().affinityReadyFuture( + globalState.transitionTopologyVersion()); - printCacheInfo(cacheReqs, activate); + if (exchFut == null) + exchFut = new GridFinishedFuture<>(); - ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage( - requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs)); + exchFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> exchFut) { + stateFut.onDone(); + } + }); + } + } + } + else { + if (globalState.active() != msg.activate()) { + ExchangeActions exchangeActions; try { - ctx.discovery().sendCustomEvent(changeGlobalStateMsg); - - if (ctx.isStopping()) - cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " + - "node is stopping.")); + exchangeActions = ctx.cache().onStateChangeRequest(msg, topVer); } catch (IgniteCheckedException e) { - U.error(log, "Failed to create or send global state change request: " + cgsFut, e); - - cgsFut.onDone(e); - } - } - catch (IgniteCheckedException e) { - cgsFut.onDone(e); - } - } + GridChangeGlobalStateFuture fut = changeStateFuture(msg); - return cgsFut; - } + if (fut != null) + fut.onDone(e); - /** - * - */ - private void sendCompute(boolean activate, final GridFutureAdapter<Void> res) { - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + return false; + } - IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute(); + Set<UUID> nodeIds = U.newHashSet(discoCache.allNodes().size()); - if (log.isInfoEnabled()) - log.info("Sending " + prettyStr(activate) + " request from node [id=" + - ctx.localNodeId() + " topVer=" + topVer + " isClient=" + ctx.isDaemon() + - " isDaemon" + ctx.isDaemon() + "]"); + for (ClusterNode node : discoCache.allNodes()) + nodeIds.add(node.id()); - IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate)); + GridChangeGlobalStateFuture fut = changeStateFuture(msg); - fut.listen(new CI1<IgniteFuture>() { - @Override public void apply(IgniteFuture fut) { - try { - fut.get(); + if (fut != null) + fut.setRemaining(nodeIds, topVer.nextMinorVersion()); - res.onDone(); - } - catch (Exception e) { - res.onDone(e); - } - } - }); - } - /** - * @param reqs Requests to print. - * @param active Active flag. - */ - private void printCacheInfo(List<DynamicCacheChangeRequest> reqs, boolean active) { - assert reqs != null; + log.info("Start state transition: " + msg.activate()); - StringBuilder sb = new StringBuilder(); + globalState = DiscoveryDataClusterState.createTransitionState(msg.activate(), + msg.requestId(), + topVer, + nodeIds); - sb.append("["); + AffinityTopologyVersion stateChangeTopVer = topVer.nextMinorVersion(); - for (int i = 0; i < reqs.size() - 1; i++) - sb.append(reqs.get(i).cacheName()).append(", "); + StateChangeRequest req = new StateChangeRequest(msg, stateChangeTopVer); - sb.append(reqs.get(reqs.size() - 1).cacheName()); + exchangeActions.stateChangeRequest(req); - sb.append("]"); + msg.exchangeActions(exchangeActions); - sb.append(" ").append(reqs.size()) - .append(" caches will be ") - .append(active ? "started" : "stopped"); + return true; + } + else { + // State already changed. + GridChangeGlobalStateFuture stateFut = changeStateFuture(msg); - if (log.isInfoEnabled()) - log.info(sb.toString()); - } + if (stateFut != null) + stateFut.onDone(); + } + } - /** - * @param req Cache being started. - */ - public void onCacheStart(DynamicCacheChangeRequest req) { - CacheInfo cacheInfo = cacheData.get(req.cacheName()); - - if (cacheInfo == null) - cacheData.put(req.cacheName(), - new CacheInfo( - new StoredCacheData(req.startCacheConfiguration()), - req.cacheType(), req.sql(), - 0L) - ); + return false; } /** - * @param req Cache being stopped. + * @return Current cluster state, should be called only from discovery thread. */ - public void onCacheStop(DynamicCacheChangeRequest req) { - CacheInfo cacheInfo = cacheData.get(req.cacheName()); - - if (cacheInfo != null) - cacheData.remove(req.cacheName()); + public DiscoveryDataClusterState clusterState() { + return globalState; } /** - * @return All caches map. + * @param msg State change message. + * @return Local future for state change process. */ - private Map<String, CacheConfiguration> allCaches() { - Map<String, CacheConfiguration> cfgs = new HashMap<>(); - - for (Map.Entry<String, CacheInfo> entry : cacheData.entrySet()) - if (cfgs.get(entry.getKey()) == null) - cfgs.put(entry.getKey(), entry.getValue().cacheData().config()); - - return cfgs; + @Nullable private GridChangeGlobalStateFuture changeStateFuture(ChangeGlobalStateMessage msg) { + return changeStateFuture(msg.initiatorNodeId(), msg.requestId()); } /** - * @return Collection of all caches start requests. - * @throws IgniteCheckedException If failed to create requests. + * @param initiatorNode Node initiated state change process. + * @param reqId State change request ID. + * @return Local future for state change process. */ - private List<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException { - assert !ctx.config().isDaemon(); - - Collection<CacheConfiguration> cacheCfgs = allCaches().values(); - - final List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); - - if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { - Map<String, StoredCacheData> ccfgs = sharedCtx.pageStore().readCacheConfigurations(); - - for (Map.Entry<String, StoredCacheData> entry : ccfgs.entrySet()) - reqs.add(createRequest(entry.getValue().config())); + @Nullable private GridChangeGlobalStateFuture changeStateFuture(UUID initiatorNode, UUID reqId) { + assert initiatorNode != null; + assert reqId != null; - for (CacheConfiguration cfg : cacheCfgs) - if (!ccfgs.keySet().contains(cfg.getName())) - reqs.add(createRequest(cfg)); + if (initiatorNode.equals(ctx.localNodeId())) { + GridChangeGlobalStateFuture fut = stateChangeFut.get(); - return reqs; + if (fut != null && fut.requestId.equals(reqId)) + return fut; } - else { - for (CacheConfiguration cfg : cacheCfgs) - reqs.add(createRequest(cfg)); - return reqs; - } + return null; } /** - * @return Collection of requests to stop caches. + * @param activate New state. + * @return State change error. */ - private List<DynamicCacheChangeRequest> stopAllCachesRequests() { - Collection<CacheConfiguration> cacheCfgs = allCaches().values(); - - List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheCfgs.size()); - - for (CacheConfiguration cfg : cacheCfgs) { - DynamicCacheChangeRequest req = stopRequest(ctx, cfg.getName(), false, false); - - reqs.add(req); - } - - return reqs; + private IgniteCheckedException concurrentStateChangeError(boolean activate) { + return new IgniteCheckedException("Failed to " + prettyStr(activate) + + ", because another state change operation is currently in progress: " + prettyStr(!activate)); } /** - * @param cfg Configuration to create request for. - * @return Dynamic cache change request. + * */ - private DynamicCacheChangeRequest createRequest(CacheConfiguration cfg) { - assert cfg != null; - assert cfg.getName() != null; - - String cacheName = cfg.getName(); + public void cacheProcessorStarted() { + cacheProc = ctx.cache(); + sharedCtx = cacheProc.context(); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - UUID.randomUUID(), cacheName, ctx.localNodeId()); + sharedCtx.io().addCacheHandler( + 0, GridChangeGlobalStateMessageResponse.class, + new CI2<UUID, GridChangeGlobalStateMessageResponse>() { + @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { + processChangeGlobalStateResponse(nodeId, msg); + } + }); + } - req.startCacheConfiguration(cfg); - req.template(cfg.getName().endsWith("*")); - req.nearCacheConfiguration(cfg.getNearConfiguration()); - req.deploymentId(IgniteUuid.randomUuid()); - req.schema(new QuerySchema(cfg.getQueryEntities())); - req.cacheType(cacheProc.cacheType(cacheName)); + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); - return req; - } + if (sharedCtx != null) + sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class); - /** - * - */ - public boolean active() { - ChangeGlobalStateContext actx = lastCgsCtx; + ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); - if (actx != null && !actx.activate && globalState == TRANSITION) - return true; + IgniteCheckedException stopErr = new IgniteCheckedException( + "Node is stopping: " + ctx.igniteInstanceName()); - if (actx != null && actx.activate && globalState == TRANSITION) - return false; + GridChangeGlobalStateFuture f = stateChangeFut.get(); - return globalState == ACTIVE; + if (f != null) + f.onDone(stopErr); } - /** - * @param cacheName Cache name to check. - * @return Locally configured flag. - */ - public boolean isLocallyConfigured(String cacheName){ - assert localCacheData != null; + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.STATE_PROC; + } - return localCacheData.caches().containsKey(cacheName) || localCacheData.templates().containsKey(cacheName); + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + if (!dataBag.commonDataCollectedFor(STATE_PROC.ordinal())) + dataBag.addGridCommonData(STATE_PROC.ordinal(), globalState); } - /** - * Invoked if cluster is inactive. - * - * @param dataBag Bag to collect data to. - */ - public void collectGridNodeData0(DiscoveryDataBag dataBag) { - if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal())) - dataBag.addGridCommonData(CACHE_PROC.ordinal(), cacheData); + /** {@inheritDoc} */ + @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + DiscoveryDataClusterState state = (DiscoveryDataClusterState)data.commonData(); + + if (state != null) + globalState = state; } /** - * @param data Joining node discovery data. + * @param activate New cluster state. + * @return State change future. */ - public void onJoiningNodeDataReceived0(JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData()) { - if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData data0 = (CacheJoinNodeDiscoveryData)data.joiningNodeData(); + public IgniteInternalFuture<?> changeGlobalState(final boolean activate) { + if (ctx.isDaemon() || ctx.clientNode()) { + GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); - cacheData.putAll(data0.caches()); - } - else if (data.joiningNodeData() instanceof CacheClientReconnectDiscoveryData) { - CacheClientReconnectDiscoveryData data0 = (CacheClientReconnectDiscoveryData)data.joiningNodeData(); + sendCompute(activate, fut); - // No-op. - } + return fut; } - } - public void onGridDataReceived0(DiscoveryDataBag.GridDiscoveryData data) { - // Receive data from active cluster. - if (data.commonData() instanceof CacheNodeCommonDiscoveryData) { - CacheNodeCommonDiscoveryData data0 = (CacheNodeCommonDiscoveryData)data.commonData(); + if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) { + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) + + " cluster (must invoke the method outside of an active transaction).")); + } - Map<String, CacheData> caches = data0.caches(); + DiscoveryDataClusterState curState = globalState; - Map<String, CacheInfo> cacheInfos = new HashMap<>(); + if (!curState.transition() && curState.active() == activate) + return new GridFinishedFuture<>(); - for (Map.Entry<String, CacheData> entry : caches.entrySet()) { - CacheData val = entry.getValue(); + GridChangeGlobalStateFuture startedFut = null; - CacheInfo info = new CacheInfo( - new StoredCacheData(val.cacheConfiguration()), - val.cacheType(), - val.sql(), - val.flags() - ); + GridChangeGlobalStateFuture fut = stateChangeFut.get(); - cacheInfos.put(entry.getKey(), info); - } + while (fut == null) { + fut = new GridChangeGlobalStateFuture(UUID.randomUUID(), activate, ctx); - cacheData.putAll(cacheInfos); + if (stateChangeFut.compareAndSet(null, fut)) { + startedFut = fut; - } // Receive data from inactive cluster. - else if (data.commonData() instanceof Map) { - Map<String, CacheInfo> data0 = (Map<String, CacheInfo>)data.commonData(); + break; + } + else + fut = stateChangeFut.get(); + } - cacheData.putAll(data0); + if (startedFut == null) { + if (fut.activate != activate) { + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to " + prettyStr(activate) + + ", because another state change operation is currently in progress: " + prettyStr(fut.activate))); + } + else + return fut; } - cacheData.putAll(localCacheData.caches()); - } + List<StoredCacheData> storedCfgs = null; - /** - * @param exchActions Requests. - * @param topVer Exchange topology version. - */ - public boolean changeGlobalState( - ExchangeActions exchActions, - AffinityTopologyVersion topVer - ) { - assert exchActions != null; - assert topVer != null; + if (activate && sharedCtx.database().persistenceEnabled()) { + try { + Map<String, StoredCacheData> cfgs = ctx.cache().context().pageStore().readCacheConfigurations(); + + if (!F.isEmpty(cfgs)) + storedCfgs = new ArrayList<>(cfgs.values()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to read stored cache configurations: " + e, e); + + startedFut.onDone(e); - if (exchActions.newClusterState() != null) { - ChangeGlobalStateContext cgsCtx = lastCgsCtx; + return startedFut; + } + } - assert cgsCtx != null : topVer; + ChangeGlobalStateMessage msg = new ChangeGlobalStateMessage(startedFut.requestId, + ctx.localNodeId(), + storedCfgs, + activate); - cgsCtx.topologyVersion(topVer); + try { + ctx.discovery().sendCustomEvent(msg); - return true; + if (ctx.isStopping()) + startedFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " + + "node is stopping.")); } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send global state change request: " + activate, e); - return false; + startedFut.onDone(e); + } + + return startedFut; } /** - * Invoke from exchange future. + * @param activate New cluster state. + * @param resFut State change future. */ - public Exception onChangeGlobalState() { - GridChangeGlobalStateFuture f = cgsLocFut.get(); + private void sendCompute(boolean activate, final GridFutureAdapter<Void> resFut) { + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - ChangeGlobalStateContext cgsCtx = lastCgsCtx; + IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()).compute(); - assert cgsCtx != null; + if (log.isInfoEnabled()) { + log.info("Sending " + prettyStr(activate) + " request from node [id=" + ctx.localNodeId() + + ", topVer=" + topVer + + ", client=" + ctx.clientNode() + + ", daemon" + ctx.isDaemon() + "]"); + } - if (f != null) - f.setRemaining(cgsCtx.topVer); + IgniteFuture<Void> fut = comp.runAsync(new ClientChangeGlobalStateComputeRequest(activate)); - return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx); + fut.listen(new CI1<IgniteFuture>() { + @Override public void apply(IgniteFuture fut) { + try { + fut.get(); + + resFut.onDone(); + } + catch (Exception e) { + resFut.onDone(e); + } + } + }); } /** - * @param exs Exs. + * @param errs Errors. + * @param req State change request. */ - public void onFullResponseMessage(Map<UUID, Exception> exs) { - assert !F.isEmpty(exs); - - ChangeGlobalStateContext actx = lastCgsCtx; - - actx.setFail(); + public void onStateChangeError(Map<UUID, Exception> errs, StateChangeRequest req) { + assert !F.isEmpty(errs); - // Revert change if activation request fail. - if (actx.activate) { + // Revert caches start if activation request fail. + if (req.activate()) { try { cacheProc.onKernalStopCaches(true); @@ -674,22 +533,10 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { sharedCtx.affinity().removeAllCacheInfo(); - ctx.discovery().cleanCachesAndGroups(); - - if (!ctx.clientNode()) { - sharedCtx.database().onDeActivate(ctx); - - if (sharedCtx.pageStore() != null) - sharedCtx.pageStore().onDeActivate(ctx); - - if (sharedCtx.wal() != null) - sharedCtx.wal().onDeActivate(ctx); - } + if (!ctx.clientNode()) + sharedCtx.deactivate(); } catch (Exception e) { - for (Map.Entry<UUID, Exception> entry : exs.entrySet()) - e.addSuppressed(entry.getValue()); - U.error(log, "Failed to revert activation request changes", e); } } @@ -697,110 +544,33 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { //todo https://issues.apache.org/jira/browse/IGNITE-5480 } - globalState = actx.activate ? INACTIVE : ACTIVE; - - GridChangeGlobalStateFuture af = cgsLocFut.get(); + GridChangeGlobalStateFuture fut = changeStateFuture(req.initiatorNodeId(), req.requestId()); - if (af != null && af.requestId.equals(actx.requestId)) { + if (fut != null) { IgniteCheckedException e = new IgniteCheckedException( - "Fail " + prettyStr(actx.activate), + "Failed to " + prettyStr(req.activate()) + " cluster", null, false ); - for (Map.Entry<UUID, Exception> entry : exs.entrySet()) + for (Map.Entry<UUID, Exception> entry : errs.entrySet()) e.addSuppressed(entry.getValue()); - af.onDone(e); - } - } - - /** - * - */ - private Exception onActivate(ChangeGlobalStateContext cgsCtx) { - final boolean client = ctx.clientNode(); - - if (log.isInfoEnabled()) - log.info("Start activation process [nodeId=" + ctx.localNodeId() + ", client=" + client + - ", topVer=" + cgsCtx.topVer + "]"); - - try { - if (!client) - sharedCtx.database().lock(); - - IgnitePageStoreManager pageStore = sharedCtx.pageStore(); - - if (pageStore != null) - pageStore.onActivate(ctx); - - if (sharedCtx.wal() != null) - sharedCtx.wal().onActivate(ctx); - - sharedCtx.database().onActivate(ctx); - - sharedCtx.snapshot().onActivate(ctx); - - if (log.isInfoEnabled()) - log.info("Successfully activated persistence managers [nodeId=" - + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); - - return null; - } - catch (Exception e) { - U.error(log, "Failed to activate persistence managers [nodeId=" + ctx.localNodeId() + ", client=" + client + - ", topVer=" + cgsCtx.topVer + "]", e); - - if (!client) - sharedCtx.database().unLock(); - - return e; - } - } - - /** - * - */ - public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) { - final boolean client = ctx.clientNode(); - - if (log.isInfoEnabled()) - log.info("Starting deactivation [id=" + ctx.localNodeId() + ", client=" + - client + ", topVer=" + cgsCtx.topVer + "]"); - - try { - ctx.dataStructures().onDeActivate(ctx); - - ctx.service().onDeActivate(ctx); - - if (log.isInfoEnabled()) - log.info("Successfully deactivated persistence processors [id=" + ctx.localNodeId() + ", client=" + - client + ", topVer=" + cgsCtx.topVer + "]"); - - return null; - } - catch (Exception e) { - U.error(log, "Failed to execute deactivation callback [nodeId=" + ctx.localNodeId() + ", client=" + client + - ", topVer=" + cgsCtx.topVer + "]", e); - - return e; + fut.onDone(e); } } /** - * + * @param req State change request. */ - private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) { - IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() { + private void onFinalActivate(final StateChangeRequest req) { + ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { boolean client = ctx.clientNode(); Exception e = null; try { - if (!ctx.config().isDaemon()) - ctx.cacheObjects().onUtilityCacheStarted(); - ctx.service().onUtilityCacheStarted(); ctx.service().onActivate(ctx); @@ -809,146 +579,114 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { if (log.isInfoEnabled()) log.info("Successfully performed final activation steps [nodeId=" - + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + + ctx.localNodeId() + ", client=" + client + ", topVer=" + req.topologyVersion() + "]"); } catch (Exception ex) { - e = ex; + e = new IgniteCheckedException("Failed to perform final activation steps", ex); U.error(log, "Failed to perform final activation steps [nodeId=" + ctx.localNodeId() + - ", client=" + client + ", topVer=" + lastCgsCtx.topVer + "]", ex); + ", client=" + client + ", topVer=" + req.topologyVersion() + "]", ex); } finally { - globalState = ACTIVE; - - sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e); + globalState.setTransitionResult(req.requestId(), true); - lastCgsCtx = null; + sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), e); } } }); - - cgsCtx.setAsyncActivateFut(asyncActivateFut); } /** - * + * @param req State change request. */ - public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) { - final boolean client = ctx.clientNode(); - - if (log.isInfoEnabled()) - log.info("Successfully performed final deactivation steps [nodeId=" - + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); - - Exception ex = null; - - try { - sharedCtx.snapshot().onDeActivate(ctx); + private void onFinalDeActivate(final StateChangeRequest req) { + globalState.setTransitionResult(req.requestId(), false); - sharedCtx.database().onDeActivate(ctx); - - if (sharedCtx.pageStore() != null) - sharedCtx.pageStore().onDeActivate(ctx); - - if (sharedCtx.wal() != null) - sharedCtx.wal().onDeActivate(ctx); - - sharedCtx.affinity().removeAllCacheInfo(); - } - catch (Exception e) { - ex = e; - } - finally { - globalState = INACTIVE; - } - - sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex); - - lastCgsCtx = null; + sendChangeGlobalStateResponse(req.requestId(), req.initiatorNodeId(), null); } /** - * + * @param req State change request. */ - public void onExchangeDone() { - ChangeGlobalStateContext cgsCtx = lastCgsCtx; - - assert cgsCtx != null; - - if (!cgsCtx.isFail()) { - if (cgsCtx.activate) - onFinalActivate(cgsCtx); - else - onFinalDeActivate(cgsCtx); - } + public void onStateChangeExchangeDone(StateChangeRequest req) { + if (req.activate()) + onFinalActivate(req); else - lastCgsCtx = null; + onFinalDeActivate(req); } /** + * @param reqId Request ID. * @param initNodeId Initialize node id. * @param ex Exception. */ - private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) { - assert requestId != null; + private void sendChangeGlobalStateResponse(UUID reqId, UUID initNodeId, Exception ex) { + assert reqId != null; assert initNodeId != null; - try { - GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex); + GridChangeGlobalStateMessageResponse res = new GridChangeGlobalStateMessageResponse(reqId, ex); + try { if (log.isDebugEnabled()) log.debug("Sending global state change response [nodeId=" + ctx.localNodeId() + - ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]"); + ", topVer=" + ctx.discovery().topologyVersionEx() + ", res=" + res + "]"); if (ctx.localNodeId().equals(initNodeId)) - processChangeGlobalStateResponse(ctx.localNodeId(), actResp); + processChangeGlobalStateResponse(ctx.localNodeId(), res); else - sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL); + sharedCtx.io().send(initNodeId, res, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) { + log.debug("Failed to send change global state response, node left [node=" + initNodeId + + ", res=" + res + ']'); + } } catch (IgniteCheckedException e) { - log.error("Fail send change global state response to " + initNodeId, e); + U.error(log, "Failed to send change global state response [node=" + initNodeId + ", res=" + res + ']', e); } } /** + * @param nodeId Node ID. * @param msg Message. */ private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) { assert nodeId != null; assert msg != null; - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Received activation response [requestId=" + msg.getRequestId() + ", nodeId=" + nodeId + "]"); - - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" + - msg.getRequestId() + ']'); - - return; } UUID requestId = msg.getRequestId(); - final GridChangeGlobalStateFuture fut = cgsLocFut.get(); - - if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) { - fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - fut.onResponse(nodeId, msg); - } - }); + final GridChangeGlobalStateFuture fut = stateChangeFut.get(); + + if (fut != null && requestId.equals(fut.requestId)) { + if (fut.initFut.isDone()) + fut.onResponse(nodeId, msg); + else { + fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + // initFut is completed from discovery thread, process response from other thread. + ctx.getSystemExecutorService().execute(new Runnable() { + @Override public void run() { + fut.onResponse(nodeId, msg); + } + }); + } + }); + } } } - - /** * @param activate Activate. + * @return Activate flag string. */ - private String prettyStr(boolean activate) { + private static String prettyStr(boolean activate) { return activate ? "activate" : "deactivate"; } @@ -993,7 +731,9 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { private final IgniteLogger log; /** - * + * @param requestId State change request ID. + * @param activate New cluster state. + * @param ctx Context. */ GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) { this.requestId = requestId; @@ -1006,7 +746,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** * @param event Event. */ - public void onDiscoveryEvent(DiscoveryEvent event) { + void onNodeLeft(DiscoveryEvent event) { assert event != null; if (isDone()) @@ -1024,29 +764,26 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { } /** - * + * @param nodesIds Node IDs. + * @param topVer Current topology version. */ - public void setRemaining(AffinityTopologyVersion topVer) { - Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); - - List<UUID> ids = new ArrayList<>(nodes.size()); - - for (ClusterNode n : nodes) - ids.add(n.id()); - - if (log.isDebugEnabled()) - log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" + - ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() + - ", nodes=" + Arrays.toString(ids.toArray()) + "]"); + void setRemaining(Set<UUID> nodesIds, AffinityTopologyVersion topVer) { + if (log.isDebugEnabled()) { + log.debug("Setup remaining node [id=" + ctx.localNodeId() + + ", client=" + ctx.clientNode() + + ", topVer=" + topVer + + ", nodes=" + nodesIds + "]"); + } synchronized (mux) { - remaining.addAll(ids); + remaining.addAll(nodesIds); } initFut.onDone(); } /** + * @param nodeId Sender node ID. * @param msg Activation message response. */ public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { @@ -1072,7 +809,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { * */ private void onAllReceived() { - Throwable e = new Throwable(); + IgniteCheckedException e = new IgniteCheckedException(); boolean fail = false; @@ -1094,9 +831,13 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { - ctx.state().cgsLocFut.set(null); + if (super.onDone(res, err)) { + ctx.state().stateChangeFut.compareAndSet(this, null); + + return true; + } - return super.onDone(res, err); + return false; } /** {@inheritDoc} */ @@ -1107,110 +848,65 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** * - * */ - private static class ChangeGlobalStateContext { - /** Request id. */ - private final UUID requestId; - - /** Initiating node id. */ - private final UUID initiatingNodeId; - - /** Batch requests. */ - private final DynamicCacheChangeBatch batch; + private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; - /** Activate. */ + /** */ private final boolean activate; - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** Fail. */ - private boolean fail; - - /** Async activate future. */ - private IgniteInternalFuture<?> asyncActivateFut; + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; /** - * + * @param activate New cluster state. */ - ChangeGlobalStateContext( - UUID requestId, - UUID initiatingNodeId, - DynamicCacheChangeBatch batch, - boolean activate - ) { - this.requestId = requestId; - this.batch = batch; + private ClientChangeGlobalStateComputeRequest(boolean activate) { this.activate = activate; - this.initiatingNodeId = initiatingNodeId; - } - - /** - * @param topVer Topology version. - */ - public void topologyVersion(AffinityTopologyVersion topVer) { - this.topVer = topVer; - } - - /** - * - */ - private void setFail() { - fail = true; - } - - /** - * - */ - private boolean isFail() { - return fail; - } - - /** - * - */ - public IgniteInternalFuture<?> getAsyncActivateFut() { - return asyncActivateFut; - } - - /** - * @param asyncActivateFut Async activate future. - */ - public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) { - this.asyncActivateFut = asyncActivateFut; } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ChangeGlobalStateContext.class, this); + @Override public void run() { + ignite.active(activate); } } /** * */ - private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable { + class TransitionOnJoinWaitFuture extends GridFutureAdapter<Boolean> { /** */ - private static final long serialVersionUID = 0L; - - /** Activation. */ - private final boolean activation; + private DiscoveryDataClusterState transitionState; - /** Ignite. */ - @IgniteInstanceResource - private Ignite ignite; + /** */ + private final Set<UUID> transitionNodes; /** - * + * @param state Current state. + * @param discoCache Discovery data cache. */ - private ClientChangeGlobalStateComputeRequest(boolean activation) { - this.activation = activation; + TransitionOnJoinWaitFuture(DiscoveryDataClusterState state, DiscoCache discoCache) { + assert state.transition() : state; + + transitionNodes = U.newHashSet(state.transitionNodes().size()); + + for (UUID nodeId : state.transitionNodes()) { + if (discoCache.node(nodeId) != null) + transitionNodes.add(nodeId); + } } /** {@inheritDoc} */ - @Override public void run() { - ignite.active(activation); + @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + joinFut = null; + + return true; + } + + return false; } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java index 3dd9911..5d77f57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/IgniteChangeGlobalStateSupport.java @@ -36,7 +36,6 @@ public interface IgniteChangeGlobalStateSupport { * Called when cluster performing deactivation. * * @param kctx Kernal context. - * @throws IgniteCheckedException If failed. */ - public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException; + public void onDeActivate(GridKernalContext kctx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 52cc9e9..4399fe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -175,8 +175,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon() || !ctx.state().active()) + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { + if (ctx.config().isDaemon() || !active) return; onKernalStart0(); @@ -278,7 +278,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext ctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext ctx) { if (log.isDebugEnabled()) log.debug("DeActivate data structure processor [nodeId=" + ctx.localNodeId() + " topVer=" + ctx.discovery().topologyVersionEx() + " ]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index c54f801..0bc0c63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -368,7 +368,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 64b68e3..42f16f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -299,7 +299,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 47fa49e..019de3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -451,7 +451,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** {@inheritDoc} */ @Override public void onDeActivate(GridKernalContext kctx) { - + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index ac171a6..ed7a225 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -343,7 +343,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 585cb20..7f331c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -340,7 +340,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index 8d3a770..b798670 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -1477,8 +1477,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { - + @Override public void onDeActivate(GridKernalContext kctx) { + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 2f6abb6..c567ac4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -37,7 +37,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.tostring.GridToStringExclude; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index c76aec4..4abefc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -968,7 +968,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index e336474..c27770f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -35,7 +35,6 @@ import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSet; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 8712756..7eb61d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -101,8 +101,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED; import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ; import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED; -import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC; -import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC; import static org.apache.ignite.igfs.IgfsMode.PRIMARY; import static org.apache.ignite.igfs.IgfsMode.PROXY; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index 3c2f64d..244820f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@ -177,7 +177,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { if (ctx.config().isDaemon()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 23ad63d..ce6c9fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -512,10 +512,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { cacheData.queryEntities(cacheDesc.schema().entities()); - CacheGroupDescriptor grpDesc = ctx.cache().cacheDescriptors().get(cacheData.config().getName()).groupDescriptor(); - try { - ctx.cache().context().pageStore().storeCacheData(grpDesc, cacheData); + ctx.cache().context().pageStore().storeCacheData(cacheData); } catch (IgniteCheckedException e) { throw new IllegalStateException("Failed to persist cache data: " + cacheData.config().getName(), e); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 716adf7..f528184 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -503,7 +503,7 @@ public class GridRestProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { if (isRestEnabled()) { for (GridRestProtocol proto : protos) proto.onKernalStart(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java index 909b524..6236026 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cluster/GridChangeStateCommandHandler.java @@ -64,7 +64,7 @@ public class GridChangeStateCommandHandler extends GridRestCommandHandlerAdapter try { if (req.command().equals(CLUSTER_CURRENT_STATE)) { - Boolean currentState = ctx.state().active(); + Boolean currentState = ctx.state().publicApiActiveState(); res.setResponse(currentState); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 12be63b..2eeee1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -211,8 +211,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.isDaemon() || !ctx.state().active()) + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { + if (ctx.isDaemon() || !active) return; onKernalStart0(); @@ -363,7 +363,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { if (log.isDebugEnabled()) log.debug("DeActivate service processor [nodeId=" + ctx.localNodeId() + " topVer=" + ctx.discovery().topologyVersionEx() + " ]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 7ac7b64..d0b88d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -153,7 +153,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { tasksMetaCache = ctx.security().enabled() && !ctx.isDaemon() ? ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index d5bacdb..5dbfe6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -582,7 +582,8 @@ class ClientImpl extends TcpDiscoveryImpl { * @param addr Address. * @return Socket, connect response and client acknowledge support flag. */ - @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { + @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, + InetSocketAddress addr) { assert addr != null; if (log.isDebugEnabled()) @@ -603,6 +604,8 @@ class ClientImpl extends TcpDiscoveryImpl { IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + DiscoveryDataPacket discoveryData = null; + while (true) { boolean openSock = false; @@ -645,9 +648,10 @@ class ClientImpl extends TcpDiscoveryImpl { marshalCredentials(node); } - msg = new TcpDiscoveryJoinRequestMessage( - node, - spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()))); + if (discoveryData == null) + discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())); + + msg = new TcpDiscoveryJoinRequestMessage(node, discoveryData); } else msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 03afff5..c2d9b7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -846,8 +846,10 @@ class ServerImpl extends TcpDiscoveryImpl { // Marshal credentials for backward compatibility and security. marshalCredentials(locNode, locCred); + DiscoveryDataPacket discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())); + while (true) { - if (!sendJoinRequestMessage()) { + if (!sendJoinRequestMessage(discoveryData)) { if (log.isDebugEnabled()) log.debug("Join request message has not been sent (local node is the first in the topology)."); @@ -973,13 +975,13 @@ class ServerImpl extends TcpDiscoveryImpl { * Address is provided by {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} and message is * sent to first node connection succeeded to. * + * @param discoveryData Discovery data. * @return {@code true} if send succeeded. * @throws IgniteSpiException If any error occurs. */ @SuppressWarnings({"BusyWait"}) - private boolean sendJoinRequestMessage() throws IgniteSpiException { - TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, - spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()))); + private boolean sendJoinRequestMessage(DiscoveryDataPacket discoveryData) throws IgniteSpiException { + TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode, discoveryData); // Time when it has been detected, that addresses from IP finder do not respond. long noResStart = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 98d2553..ab61687 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -179,6 +179,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** * @throws InterruptedException If interrupted. */ + public void waitForBlocked() throws InterruptedException { + synchronized (this) { + while (blockedMsgs.isEmpty()) + wait(); + } + } + + /** + * @throws InterruptedException If interrupted. + */ public void waitForRecorded() throws InterruptedException { synchronized (this) { while (recordedMsgs.isEmpty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java index 5e85b62..b88eef9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java @@ -128,7 +128,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT mgr.start(); - mgr.onKernalStart(); + mgr.onKernalStart(true); assertTrue(mgr.enabled()); } @@ -143,7 +143,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT assertTrue(mgr.enabled()); - mgr.onKernalStart(); + mgr.onKernalStart(true); mgr.onKernalStop(false);
