http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java deleted file mode 100644 index 1e1ef71..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -/** - * - */ -public enum ClusterState { - /** - * Cache is inactive. No operations are allowed, no partition assignments or rebalancing is performed. - */ - INACTIVE, - - /** - * Cache is active and operations. There are no lost partitions. - */ - ACTIVE, - - /** - * Cache is inactive. But process of it activation in progress. - */ - TRANSITION -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 6d5eaf3..2fd8780 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -83,15 +83,15 @@ public class DynamicCacheChangeRequest implements Serializable { /** */ private UUID rcvdFrom; - /** Cache state. Set to non-null when global state is changed. */ - private ClusterState state; - /** Reset lost partitions flag. */ private boolean resetLostPartitions; /** Dynamic schema. */ private QuerySchema schema; + /** */ + private transient boolean locallyConfigured; + /** * @param reqId Unique request ID. * @param cacheName Cache stop name. @@ -100,7 +100,6 @@ public class DynamicCacheChangeRequest implements Serializable { public DynamicCacheChangeRequest(UUID reqId, String cacheName, UUID initiatingNodeId) { assert reqId != null; assert cacheName != null; - assert initiatingNodeId != null; this.reqId = reqId; this.cacheName = cacheName; @@ -108,21 +107,6 @@ public class DynamicCacheChangeRequest implements Serializable { } /** - * @param reqId Unique request ID. - * @param state New cluster state. - * @param initiatingNodeId Initiating node ID. - */ - public DynamicCacheChangeRequest(UUID reqId, ClusterState state, UUID initiatingNodeId) { - assert reqId != null; - assert state != null; - assert initiatingNodeId != null; - - this.reqId = reqId; - this.state = state; - this.initiatingNodeId = initiatingNodeId; - } - - /** * @param ctx Context. * @param cacheName Cache name. * @return Request to reset lost partitions. @@ -183,20 +167,6 @@ public class DynamicCacheChangeRequest implements Serializable { } /** - * @return State. - */ - public ClusterState state() { - return state; - } - - /** - * @return {@code True} if global caches state is changes. - */ - public boolean globalStateChange() { - return state != null; - } - - /** * @param template {@code True} if this is request for adding template configuration. */ public void template(boolean template) { @@ -253,7 +223,7 @@ public class DynamicCacheChangeRequest implements Serializable { } /** - * + * @return Destroy flag. */ public boolean destroy(){ return destroy; @@ -420,6 +390,20 @@ public class DynamicCacheChangeRequest implements Serializable { this.schema = schema != null ? schema.copy() : null; } + /** + * @return Locally configured flag. + */ + public boolean locallyConfigured() { + return locallyConfigured; + } + + /** + * @param locallyConfigured Locally configured flag. + */ + public void locallyConfigured(boolean locallyConfigured) { + this.locallyConfigured = locallyConfigured; + } + /** {@inheritDoc} */ @Override public String toString() { return "DynamicCacheChangeRequest [cacheName=" + cacheName() + http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index 9caf9aa..e9ece5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -50,7 +50,7 @@ public class ExchangeActions { private Map<String, ActionData> cachesToResetLostParts; /** */ - private ClusterState newState; + private StateChangeRequest stateChangeReq; /** * @param grpId Group ID. @@ -89,7 +89,7 @@ public class ExchangeActions { /** * @return New caches start requests. */ - Collection<ActionData> cacheStartRequests() { + public Collection<ActionData> cacheStartRequests() { return cachesToStart != null ? cachesToStart.values() : Collections.<ActionData>emptyList(); } @@ -184,19 +184,31 @@ public class ExchangeActions { } /** - * @param state New cluster state. + * @param stateChange Cluster state change request. */ - void newClusterState(ClusterState state) { - assert state != null; + public void stateChangeRequest(StateChangeRequest stateChange) { + this.stateChangeReq = stateChange; + } + + /** + * @return {@code True} if has deactivate request. + */ + public boolean deactivate() { + return stateChangeReq != null && !stateChangeReq.activate(); + } - newState = state; + /** + * @return {@code True} if has activate request. + */ + public boolean activate() { + return stateChangeReq != null && stateChangeReq.activate(); } /** - * @return New cluster state if state change was requested. + * @return Cluster state change request. */ - @Nullable public ClusterState newClusterState() { - return newState; + @Nullable public StateChangeRequest stateChangeRequest() { + return stateChangeReq; } /** @@ -328,13 +340,14 @@ public class ExchangeActions { F.isEmpty(cachesToStop) && F.isEmpty(cacheGrpsToStart) && F.isEmpty(cacheGrpsToStop) && - F.isEmpty(cachesToResetLostParts); + F.isEmpty(cachesToResetLostParts) && + stateChangeReq == null; } /** * */ - static class ActionData { + public static class ActionData { /** */ private final DynamicCacheChangeRequest req; @@ -429,6 +442,6 @@ public class ExchangeActions { ", startGrps=" + startGrps + ", stopGrps=" + stopGrps + ", resetParts=" + (cachesToResetLostParts != null ? cachesToResetLostParts.keySet() : null) + - ", newState=" + newState + ']'; + ", stateChangeRequest=" + stateChangeReq + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index a967305..a9692f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; -import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.typedef.F; @@ -32,7 +31,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 8ba10a2..7735f74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; -import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictionFilter; import org.apache.ignite.cache.eviction.EvictionPolicy; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 2de3808..f9d1114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1405,30 +1405,33 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param cctx Context. * @param topic Topic. * @param c Handler. */ - public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) { - addOrderedHandler(false, topic, c); + public void addOrderedCacheHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) { + addOrderedHandler(cctx, false, topic, c); } /** + * @param cctx Context. * @param topic Topic. * @param c Handler. */ - public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) { - addOrderedHandler(true, topic, c); + public void addOrderedCacheGroupHandler(GridCacheSharedContext cctx, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) { + addOrderedHandler(cctx, true, topic, c); } /** * Adds ordered message handler. * + * @param cctx Context. * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param topic Topic. * @param c Handler. */ @SuppressWarnings({"unchecked"}) - private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + private void addOrderedHandler(GridCacheSharedContext cctx, boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; IgniteLogger log0 = log; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 24433de..a6907b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -274,10 +274,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { pendingExplicit = GridConcurrentFactory.newMap(); } - /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - if (!reconnect) - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + /** + * Cache futures listener must be registered after communication listener. + */ + public void registerEventListener() { + cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 93310e3..22345d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -81,6 +82,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Ign import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; @@ -192,6 +195,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); + /** Events received while cluster state transition was in progress. */ + private final List<PendingDiscoveryEvent> pendingEvts = new ArrayList<>(); + /** Discovery listener. */ private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { @@ -199,109 +205,53 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; try { - ClusterNode loc = cctx.localNode(); - - assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || - evt.type() == EVT_DISCOVERY_CUSTOM_EVT; - - final ClusterNode n = evt.eventNode(); - - GridDhtPartitionExchangeId exchId = null; - GridDhtPartitionsExchangeFuture exchFut = null; - - if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { - assert !loc.id().equals(n.id()); - - if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { - assert cctx.discovery().node(n.id()) == null; - - // Avoid race b/w initial future add and discovery event. - GridDhtPartitionsExchangeFuture initFut = null; - - if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) { - initFut = exchangeFuture(initialExchangeId(), null, null, null, null); - - initFut.onNodeLeft(n); - } - - for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) { - if (f != initFut) - f.onNodeLeft(n); - } - } + if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT && + (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateMessage)) { + ChangeGlobalStateMessage stateChangeMsg = + (ChangeGlobalStateMessage)((DiscoveryCustomEvent)evt).customMessage(); - assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() : - "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + if (stateChangeMsg.exchangeActions() == null) + return; - exchId = exchangeId(n.id(), - affinityTopologyVersion(evt), - evt.type()); + onDiscoveryEvent(evt, cache); - exchFut = exchangeFuture(exchId, evt, cache,null, null); + return; } - else { - DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); + if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT && + (((DiscoveryCustomEvent)evt).customMessage() instanceof ChangeGlobalStateFinishMessage)) { + ChangeGlobalStateFinishMessage stateFinishMsg = + (ChangeGlobalStateFinishMessage)((DiscoveryCustomEvent)evt).customMessage(); - if (customMsg instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg; - - ExchangeActions exchActions = batch.exchangeActions(); - - if (exchActions != null) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + if (stateFinishMsg.clusterActive()) { + for (PendingDiscoveryEvent pendingEvt : pendingEvts) { + if (log.isDebugEnabled()) + log.debug("Process pending event: " + pendingEvt.event()); - exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); + onDiscoveryEvent(pendingEvt.event(), pendingEvt.discoCache()); } } - else if (customMsg instanceof CacheAffinityChangeMessage) { - CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; - - if (msg.exchangeId() == null) { - if (msg.exchangeNeeded()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - - exchFut = exchangeFuture(exchId, evt, cache, null, msg); - } - } - else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion()) - exchangeFuture(msg.exchangeId(), null, null, null, null) - .onAffinityChangeMessage(evt.eventNode(), msg); + else { + for (PendingDiscoveryEvent pendingEvt : pendingEvts) + processEventInactive(pendingEvt.event(), pendingEvt.discoCache()); } - else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage - && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); - exchFut = exchangeFuture(exchId, evt, null, null, null); - } - else { - // Process event as custom discovery task if needed. - CachePartitionExchangeWorkerTask task = - cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + pendingEvts.clear(); - if (task != null) - exchWorker.addCustomTask(task); - } + return; } - if (exchId != null) { + if (cache.state().transition()) { if (log.isDebugEnabled()) - log.debug("Discovery event (will start exchange): " + exchId); - - // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, evt, cache); + log.debug("Add pending event: " + evt); - // Start exchange process. - addFuture(exchFut); - } - else { - if (log.isDebugEnabled()) - log.debug("Do not start exchange for discovery event: " + evt); + pendingEvts.add(new PendingDiscoveryEvent(evt, cache)); } + else if (cache.state().active()) + onDiscoveryEvent(evt, cache); + else + processEventInactive(evt, cache); - // Notify indexing engine about node leave so that we can re-map coordinator accordingly. - if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) - exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode())); + notifyNodeFail(evt); } finally { leaveBusy(); @@ -309,6 +259,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } }; + /** + * @param evt Event. + */ + private void notifyNodeFail(DiscoveryEvent evt) { + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { + final ClusterNode n = evt.eventNode(); + + assert cctx.discovery().node(n.id()) == null; + + for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) + f.onNodeLeft(n); + } + } + + /** + * @param evt Event. + * @param cache Discovery data cache. + */ + private void processEventInactive(DiscoveryEvent evt, DiscoCache cache) { + if (log.isDebugEnabled()) + log.debug("Ignore event, cluster is inactive: " + evt); + } + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { super.start0(); @@ -338,12 +311,158 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana processSinglePartitionRequest(node, msg); } }); + + if (!cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { + final int idx = cnt; + + cctx.io().addOrderedCacheGroupHandler(cctx, rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() { + @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) { + if (!enterBusy()) + return; + + try { + CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId()); + + if (grp != null) { + if (m instanceof GridDhtPartitionSupplyMessage) { + grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m); + + return; + } + else if (m instanceof GridDhtPartitionDemandMessage) { + grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m); + + return; + } + } + + U.error(log, "Unsupported message type: " + m.getClass().getName()); + } + finally { + leaveBusy(); + } + } + }); + } + } + } + + /** + * Callback for local join event (needed since regular event for local join is not generated). + * + * @param evt Event. + * @param cache Cache. + */ + public void onLocalJoin(DiscoveryEvent evt, DiscoCache cache) { + discoLsnr.onEvent(evt, cache); + } + + /** + * @param evt Event. + * @param cache Discovery data cache. + */ + private void onDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) { + ClusterNode loc = cctx.localNode(); + + assert evt.type() == EVT_NODE_JOINED || evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED || + evt.type() == EVT_DISCOVERY_CUSTOM_EVT; + + final ClusterNode n = evt.eventNode(); + + GridDhtPartitionExchangeId exchId = null; + GridDhtPartitionsExchangeFuture exchFut = null; + + if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { + assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() : + "Node joined with smaller-than-local " + + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, cache,null, null); + } + else { + DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); + + if (customMsg instanceof ChangeGlobalStateMessage) { + ChangeGlobalStateMessage stateChangeMsg = (ChangeGlobalStateMessage)customMsg; + + ExchangeActions exchActions = stateChangeMsg.exchangeActions(); + + if (exchActions != null) { + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); + } + } + else if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg; + + ExchangeActions exchActions = batch.exchangeActions(); + + if (exchActions != null) { + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); + } + } + else if (customMsg instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg; + + if (msg.exchangeId() == null) { + if (msg.exchangeNeeded()) { + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, cache, null, msg); + } + } + else if (msg.exchangeId().topologyVersion().topologyVersion() >= cctx.discovery().localJoinEvent().topologyVersion()) + exchangeFuture(msg.exchangeId(), null, null, null, null) + .onAffinityChangeMessage(evt.eventNode(), msg); + } + else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage + && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) { + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, null, null, null); + } + else { + // Process event as custom discovery task if needed. + CachePartitionExchangeWorkerTask task = + cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + + if (task != null) + exchWorker.addCustomTask(task); + } + } + + if (exchId != null) { + if (log.isDebugEnabled()) + log.debug("Discovery event (will start exchange): " + exchId); + + // Event callback - without this callback future will never complete. + exchFut.onEvent(exchId, evt, cache); + + // Start exchange process. + addFuture(exchFut); + } + else { + if (log.isDebugEnabled()) + log.debug("Do not start exchange for discovery event: " + evt); + } + + notifyNodeFail(evt); + + // Notify indexing engine about node leave so that we can re-map coordinator accordingly. + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) + exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode())); } /** * @param task Task to run in exchange worker thread. */ - public void addCustomTask(CachePartitionExchangeWorkerTask task) { + void addCustomTask(CachePartitionExchangeWorkerTask task) { assert task != null; exchWorker.addCustomTask(task); @@ -371,9 +490,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED); } - /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - super.onKernalStart0(reconnect); + /** + * @param active Cluster state. + * @param reconnect Reconnect flag. + * @throws IgniteCheckedException If failed. + */ + public void onKernalStart(boolean active, boolean reconnect) throws IgniteCheckedException { + for (ClusterNode n : cctx.discovery().remoteNodes()) + cctx.versions().onReceived(n.id(), n.metrics().getLastDataVersion()); ClusterNode loc = cctx.localNode(); @@ -381,79 +505,49 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; - // Generate dummy discovery event for local node joining. - T2<DiscoveryEvent, DiscoCache> locJoin = cctx.discovery().localJoin(); - - DiscoveryEvent discoEvt = locJoin.get1(); - DiscoCache discoCache = locJoin.get2(); - - GridDhtPartitionExchangeId exchId = initialExchangeId(); + DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin(); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, discoCache, null, null); + GridDhtPartitionsExchangeFuture fut = null; if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - exchWorker.addFirstExchangeFuture(fut); - - if (!cctx.kernalContext().clientNode()) { - for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { - final int idx = cnt; - - cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() { - @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) { - if (!enterBusy()) - return; - - try { - CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId()); - - if (grp != null) { - if (m instanceof GridDhtPartitionSupplyMessage) { - grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m); - - return; - } - else if (m instanceof GridDhtPartitionDemandMessage) { - grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m); + if (active) { + DiscoveryEvent discoEvt = locJoin.event(); + DiscoCache discoCache = locJoin.discoCache(); - return; - } - } + GridDhtPartitionExchangeId exchId = initialExchangeId(); - U.error(log, "Unsupported message type: " + m.getClass().getName()); - } - finally { - leaveBusy(); - } - } - }); - } + fut = exchangeFuture(exchId, discoEvt, discoCache, null, null); } + else if (reconnect) + reconnectExchangeFut.onDone(); new IgniteThread(cctx.igniteInstanceName(), "exchange-worker", exchWorker).start(); if (reconnect) { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - try { - fut.get(); + if (fut != null) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + fut.get(); - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.preloader().onInitialExchangeComplete(null); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.preloader().onInitialExchangeComplete(null); - reconnectExchangeFut.onDone(); - } - catch (IgniteCheckedException e) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.preloader().onInitialExchangeComplete(e); + reconnectExchangeFut.onDone(); + } + catch (IgniteCheckedException e) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.preloader().onInitialExchangeComplete(e); - reconnectExchangeFut.onDone(e); + reconnectExchangeFut.onDone(e); + } } - } - }); + }); + } } - else { + else if (fut != null) { if (log.isDebugEnabled()) log.debug("Beginning to wait on local exchange future: " + fut); @@ -489,10 +583,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0); - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (nodeStartVer.equals(grp.localStartVersion())) + if (locJoin.joinTopologyVersion().equals(grp.localStartVersion())) grp.preloader().onInitialExchangeComplete(null); } @@ -1669,28 +1761,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(time, MILLISECONDS); - } - - /** * Exchange future thread. All exchanges happen only by one thread and next * exchange will not start until previous one completes. */ @@ -1710,15 +1780,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * Add first exchange future. - * - * @param exchFut Exchange future. - */ - void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) { - futQ.addFirst(exchFut); - } - - /** * @param exchFut Exchange future. */ void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) { @@ -1946,7 +2007,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) { + if (!exchFut.skipPreload() ) { assignsMap = new HashMap<>(); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0f859eb..624dec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -107,6 +107,9 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactio import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.QuerySchema; @@ -692,36 +695,27 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - if (ctx.config().isDaemon()) { - ctx.state().cacheProcessorStarted(new CacheJoinNodeDiscoveryData( - IgniteUuid.randomUuid(), - Collections.<String, CacheInfo>emptyMap(), - Collections.<String, CacheInfo>emptyMap(), - false - )); - - return; - } - - Map<String, CacheInfo> caches = new HashMap<>(); + if (!ctx.isDaemon()) { + Map<String, CacheInfo> caches = new HashMap<>(); - Map<String, CacheInfo> templates = new HashMap<>(); + Map<String, CacheInfo> templates = new HashMap<>(); - addCacheOnJoinFromConfig(caches, templates); + addCacheOnJoinFromConfig(caches, templates); - CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData( - IgniteUuid.randomUuid(), - caches, - templates, - startAllCachesOnClientStart() - ); + CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData( + IgniteUuid.randomUuid(), + caches, + templates, + startAllCachesOnClientStart() + ); - cachesInfo.onStart(discoData); + cachesInfo.onStart(discoData); - if (log.isDebugEnabled()) - log.debug("Started cache processor."); + if (log.isDebugEnabled()) + log.debug("Started cache processor."); + } - ctx.state().cacheProcessorStarted(discoData); + ctx.state().cacheProcessorStarted(); } /** @@ -830,51 +824,38 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() throws IgniteCheckedException { - boolean active = ctx.state().active(); + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { + if (ctx.isDaemon()) + return; try { - boolean checkConsistency = - !ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK); + boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK); if (checkConsistency) checkConsistency(); - if (active && cachesInfo.onJoinCacheException() != null) - throw new IgniteCheckedException(cachesInfo.onJoinCacheException()); - cachesInfo.onKernalStart(checkConsistency); - if (active && !ctx.clientNode() && !ctx.isDaemon()) - sharedCtx.database().lock(); - - // Must start database before start first cache. - sharedCtx.database().onKernalStart(false); - ctx.query().onCacheKernalStart(); - // In shared context, we start exchange manager and wait until processed local join - // event, all caches which we get on join will be start. - for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) { - if (sharedCtx.database() != mgr) - mgr.onKernalStart(false); - } + sharedCtx.mvcc().registerEventListener(); + + sharedCtx.exchange().onKernalStart(active, false); } finally { cacheStartedLatch.countDown(); } + if (!ctx.clientNode()) + addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000)); + // Escape if cluster inactive. if (!active) return; - if (!ctx.config().isDaemon()) - ctx.cacheObjects().onUtilityCacheStarted(); - ctx.service().onUtilityCacheStarted(); - final AffinityTopologyVersion startTopVer = - new AffinityTopologyVersion(ctx.discovery().localJoinEvent().topologyVersion(), 0); + final AffinityTopologyVersion startTopVer = ctx.discovery().localJoin().joinTopologyVersion(); final List<IgniteInternalFuture> syncFuts = new ArrayList<>(caches.size()); @@ -894,15 +875,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } }); - // Avoid iterator creation. - //noinspection ForLoopReplaceableByForEach for (int i = 0, size = syncFuts.size(); i < size; i++) syncFuts.get(i).get(); - - assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; - - if (!ctx.clientNode() && !ctx.isDaemon()) - addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000)); } /** @@ -969,8 +943,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (CacheGroupContext grp : cacheGrps.values()) stopCacheGroup(grp.groupId()); - - cachesInfo.clearCaches(); } /** @@ -1097,7 +1069,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); - ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(); + DiscoveryDataClusterState state = ctx.state().clusterState(); + + boolean active = state.active() && !state.transition(); + + ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(active, state.transition()); final List<GridCacheAdapter> stoppedCaches = new ArrayList<>(); @@ -1135,7 +1111,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { grp.onReconnected(); } - sharedCtx.onReconnected(); + sharedCtx.onReconnected(active); for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); @@ -1750,17 +1726,26 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Caches to be started when this node starts. + */ + public List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> cachesToStartOnLocalJoin() { + return cachesInfo.cachesToStartOnLocalJoin(); + } + + /** + * @param caches Caches to start. * @param exchTopVer Current exchange version. * @throws IgniteCheckedException If failed. */ - public void startCachesOnLocalJoin(AffinityTopologyVersion exchTopVer) throws IgniteCheckedException { - List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cachesInfo.cachesToStartOnLocalJoin(); - + public void startCachesOnLocalJoin(List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches, + AffinityTopologyVersion exchTopVer) + throws IgniteCheckedException { if (!F.isEmpty(caches)) { for (T2<DynamicCacheDescriptor, NearCacheConfiguration> t : caches) { DynamicCacheDescriptor desc = t.get1(); prepareCacheStart( + desc.cacheConfiguration(), desc, t.get2(), exchTopVer @@ -1787,6 +1772,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (CU.affinityNode(ctx.discovery().localNode(), filter)) { prepareCacheStart( + desc.cacheConfiguration(), desc, null, exchTopVer @@ -1799,17 +1785,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param startCfg Cache configuration to use. * @param desc Cache descriptor. * @param reqNearCfg Near configuration if specified for client cache start request. * @param exchTopVer Current exchange version. * @throws IgniteCheckedException If failed. */ - public void prepareCacheStart( + void prepareCacheStart( + CacheConfiguration startCfg, DynamicCacheDescriptor desc, @Nullable NearCacheConfiguration reqNearCfg, AffinityTopologyVersion exchTopVer ) throws IgniteCheckedException { - CacheConfiguration startCfg = desc.cacheConfiguration(); assert !caches.containsKey(startCfg.getName()) : startCfg.getName(); CacheConfiguration ccfg = new CacheConfiguration(startCfg); @@ -2003,7 +1990,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.removeCacheContext(ctx); - onKernalStop(cache, destroy); + onKernalStop(cache, true); stopCache(cache, true, destroy); @@ -2017,9 +2004,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param startTopVer Cache start version. * @param err Cache start error if any. */ - void initCacheProxies( - AffinityTopologyVersion startTopVer, @Nullable - Throwable err) { + void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) { for (GridCacheAdapter<?, ?> cache : caches.values()) { GridCacheContext<?, ?> cacheCtx = cache.context(); @@ -2122,7 +2107,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (exchActions == null) return; - if (exchActions.systemCachesStarting() && exchActions.newClusterState() == null) + if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null) ctx.dataStructures().restoreStructuresState(ctx); if (err == null) { @@ -2143,9 +2128,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { prepareCacheStop(action.request().cacheName(), action.request().destroy()); - - if (exchActions.newClusterState() == null) - ctx.state().onCacheStop(action.request()); } finally { sharedCtx.database().checkpointReadUnlock(); @@ -2166,6 +2148,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!sharedCtx.kernalContext().clientNode()) sharedCtx.database().onCacheGroupsStopped(stoppedGroups); + + if (exchActions.deactivate()) + sharedCtx.deactivate(); } } @@ -2204,10 +2189,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param req Request to complete future for. + * @param success Future result. * @param err Error if any. */ void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Throwable err) { - if (req.initiatingNodeId().equals(ctx.localNodeId())) { + if (ctx.localNodeId().equals(req.initiatingNodeId())) { DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId()); if (fut != null) @@ -2304,30 +2290,35 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - if (ctx.state().active()) - cachesInfo.collectGridNodeData(dataBag); - else - ctx.state().collectGridNodeData0(dataBag); + cachesInfo.collectGridNodeData(dataBag); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - if (ctx.state().active()) - cachesInfo.onJoiningNodeDataReceived(data); - - ctx.state().onJoiningNodeDataReceived0(data); + cachesInfo.onJoiningNodeDataReceived(data); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - if (ctx.state().active()) { - if (!cachesInfo.disconnectedState()) - cachesInfo.addJoinInfo(); + cachesInfo.onGridDataReceived(data); + } - cachesInfo.onGridDataReceived(data); - } + /** + * @param msg Message. + */ + public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { + cachesInfo.onStateChangeFinish(msg); + } - ctx.state().onGridDataReceived0(data); + /** + * @param msg Message. + * @param topVer Current topology version. + * @throws IgniteCheckedException If configuration validation failed. + * @return Exchange actions. + */ + public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) + throws IgniteCheckedException { + return cachesInfo.onStateChangeRequest(msg, topVer); } /** @@ -2929,13 +2920,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param type Event type. + * @param customMsg Custom message instance. * @param node Event node. * @param topVer Topology version. + * @param state Cluster state. */ - public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + public void onDiscoveryEvent(int type, + @Nullable DiscoveryCustomMessage customMsg, + ClusterNode node, + AffinityTopologyVersion topVer, + DiscoveryDataClusterState state) { cachesInfo.onDiscoveryEvent(type, node, topVer); - sharedCtx.affinity().onDiscoveryEvent(type, node, topVer); + sharedCtx.affinity().onDiscoveryEvent(type, customMsg, node, topVer, state); } /** @@ -3214,7 +3211,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false); } - assert proxy != null; + assert proxy != null : name; return proxy.internalProxy(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 75d03d7..9adca8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; +import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -156,6 +157,9 @@ public class GridCacheSharedContext<K, V> { /** Concurrent DHT atomic updates counters. */ private AtomicIntegerArray dhtAtomicUpdCnt; + /** */ + private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs; + /** * @param kernalCtx Context. * @param txMgr Transaction manager. @@ -207,6 +211,49 @@ public class GridCacheSharedContext<K, V> { txFinishMsgLog = kernalCtx.log(CU.TX_MSG_FINISH_LOG_CATEGORY); txLockMsgLog = kernalCtx.log(CU.TX_MSG_LOCK_LOG_CATEGORY); txRecoveryMsgLog = kernalCtx.log(CU.TX_MSG_RECOVERY_LOG_CATEGORY); + + stateAwareMgrs = new ArrayList<>(); + + if (pageStoreMgr != null) + stateAwareMgrs.add(pageStoreMgr); + + if (walMgr != null) + stateAwareMgrs.add(walMgr); + + stateAwareMgrs.add(dbMgr); + + stateAwareMgrs.add(snpMgr); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void activate() throws IgniteCheckedException { + if (!kernalCtx.clientNode()) + dbMgr.lock(); + + boolean success = false; + + try { + for (IgniteChangeGlobalStateSupport mgr : stateAwareMgrs) + mgr.onActivate(kernalCtx); + + success = true; + } + finally { + if (!success) { + if (!kernalCtx.clientNode()) + dbMgr.unLock(); + } + } + } + + /** + * + */ + public void deactivate() { + for (int i = stateAwareMgrs.size() - 1; i >= 0; i--) + stateAwareMgrs.get(i).onDeActivate(kernalCtx); } /** @@ -272,12 +319,15 @@ public class GridCacheSharedContext<K, V> { if (restartOnDisconnect(mgr)) mgr.stop(true); } + + deactivate(); } /** + * @param active Active flag. * @throws IgniteCheckedException If failed. */ - void onReconnected() throws IgniteCheckedException { + void onReconnected(boolean active) throws IgniteCheckedException { List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); setManagers(mgrs, txMgr, @@ -303,8 +353,10 @@ public class GridCacheSharedContext<K, V> { kernalCtx.query().onCacheReconnect(); - for (GridCacheSharedManager<?, ?> mgr : mgrs) - mgr.onKernalStart(true); + if (!active) + affinity().removeAllCacheInfo(); + + exchMgr.onKernalStart(active, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java index e0e4090..bc1bbb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java @@ -40,12 +40,6 @@ public interface GridCacheSharedManager<K, V> { public void stop(boolean cancel); /** - * @param reconnect {@code True} if manager restarted after client reconnect. - * @throws IgniteCheckedException If failed. - */ - public void onKernalStart(boolean reconnect) throws IgniteCheckedException; - - /** * @param cancel Cancel flag. */ public void onKernalStop(boolean cancel); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index f6f79e4..90ae670 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -112,14 +112,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ - @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException { - onKernalStart0(reconnect); - - if (!reconnect && log != null && log.isDebugEnabled()) - log.debug(kernalStartInfo()); - } - - /** {@inheritDoc} */ @Override public final void onKernalStop(boolean cancel) { if (!starting.get()) // Ignoring attempt to stop manager that has never been started. @@ -132,14 +124,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** - * @param reconnect {@code True} if manager restarted after client reconnect. - * @throws IgniteCheckedException If failed. - */ - protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - // No-op. - } - - /** * @param cancel Cancel flag. */ protected void onKernalStop0(boolean cancel) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java new file mode 100644 index 0000000..b4274f7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PendingDiscoveryEvent.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class PendingDiscoveryEvent { + /** */ + private final DiscoveryEvent evt; + + /** */ + private final DiscoCache cache; + + /** + * @param evt Event. + * @param cache Discovery data cache. + */ + public PendingDiscoveryEvent(DiscoveryEvent evt, DiscoCache cache) { + this.evt = evt; + this.cache = cache; + } + + /** + * @return Event. + */ + public DiscoveryEvent event() { + return evt; + } + + /** + * @return Discovery data cache. + */ + public DiscoCache discoCache() { + return cache; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PendingDiscoveryEvent.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java new file mode 100644 index 0000000..2d35e81 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StateChangeRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class StateChangeRequest { + /** */ + private final ChangeGlobalStateMessage msg; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param msg Message. + * @param topVer State change topology versoin. + */ + public StateChangeRequest(ChangeGlobalStateMessage msg, + AffinityTopologyVersion topVer) { + this.msg = msg; + this.topVer = topVer; + } + + /** + * @return State change exchange version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return State change request ID. + */ + public UUID requestId() { + return msg.requestId(); + } + + /** + * @return New state. + */ + public boolean activate() { + return msg.activate(); + } + + /** + * @return Node initiated state change process. + */ + public UUID initiatorNodeId() { + return msg.initiatorNodeId(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StateChangeRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index c2c71ea..0065e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -258,8 +258,8 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { + super.onKernalStart(active); discoveryStarted = true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 1c97de2..d6b45b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -29,7 +29,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.GridLeanMap; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 38d0108..960b91a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 123d26b..0ea48e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -27,7 +27,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 6392d0a..439bb9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -19,13 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index c205c3b..57ce323 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -424,11 +423,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { throws IgniteCheckedException { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - ClusterState newState = exchFut.newClusterState(); - - treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) - || (ctx.kernalContext().state().active() - && discoEvt.type() == EventType.EVT_NODE_JOINED + treatAllPartAsLoc = exchFut.activateCluster() + || (discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() && !ctx.kernalContext().clientNode() ); @@ -611,7 +607,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locPart != null) { GridDhtPartitionState state = locPart.state(); - if (state == MOVING && ctx.kernalContext().state().active()) { + if (state == MOVING) { locPart.rent(false); updateSeq = updateLocal(p, locPart.state(), updateSeq); @@ -1773,9 +1769,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Checks if any of the local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { - if (!ctx.kernalContext().state().active()) - return false; - boolean changed = false; UUID locId = ctx.localNodeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java index e70f383..d04870a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java @@ -80,7 +80,7 @@ public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<Aff if (err != null) return err; - if (!cctx.shared().kernalContext().state().active()) + if (!cctx.shared().kernalContext().state().publicApiActiveState()) return new CacheInvalidStateException( "Failed to perform cache operation (cluster is not activated): " + cctx.name()); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index cfecb1c..d66afca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -18,16 +18,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteDiagnosticAware; -import org.apache.ignite.internal.IgniteDiagnosticMessage; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 1bd8ec5..6fe96a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 763b43b..fe216a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 774f0ce..e7e95b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -161,6 +161,8 @@ public class GridDhtPartitionDemander { lastExchangeFut = null; lastTimeoutObj.set(null); + + syncFut.onDone(); } /**
