http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d52ad72..c879016 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -22,11 +22,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,8 +34,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
-import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.DiscoveryEvent;
@@ -49,30 +46,30 @@ import
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import
org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -108,38 +105,17 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
/** */
@GridToStringInclude
- private final Collection<UUID> rcvdIds = new GridConcurrentHashSet<>();
+ private final Set<UUID> remaining = new HashSet<>();
- /** Remote nodes. */
- private volatile Collection<ClusterNode> rmtNodes;
-
- /** Remote nodes. */
- @GridToStringInclude
- private volatile Collection<UUID> rmtIds;
+ /** */
+ private List<ClusterNode> srvNodes;
- /** Oldest node. */
- @GridToStringExclude
- private final AtomicReference<ClusterNode> oldestNode = new
AtomicReference<>();
+ /** */
+ private ClusterNode crd;
/** ExchangeFuture id. */
private final GridDhtPartitionExchangeId exchId;
- /** Init flag. */
- @GridToStringInclude
- private final AtomicBoolean init = new AtomicBoolean(false);
-
- /** Ready for reply flag. */
- @GridToStringInclude
- private final AtomicBoolean ready = new AtomicBoolean(false);
-
- /** Replied flag. */
- @GridToStringInclude
- private final AtomicBoolean replied = new AtomicBoolean(false);
-
- /** Timeout object. */
- @GridToStringExclude
- private volatile GridTimeoutObject timeoutObj;
-
/** Cache context. */
private final GridCacheSharedContext<?, ?> cctx;
@@ -156,6 +132,12 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
/** */
private GridFutureAdapter<Boolean> initFut;
+ /** */
+ private final List<IgniteRunnable> discoEvts = new ArrayList<>();
+
+ /** */
+ private boolean init;
+
/** Topology snapshot. */
private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new
AtomicReference<>();
@@ -166,10 +148,10 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
* Messages received on non-coordinator are stored in case if this node
* becomes coordinator.
*/
- private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new
ConcurrentHashMap8<>();
+ private final Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs
= new ConcurrentHashMap8<>();
/** Messages received from new coordinator. */
- private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new
ConcurrentHashMap8<>();
+ private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs =
new ConcurrentHashMap8<>();
/** */
@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
@@ -185,6 +167,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
/** Dynamic cache change requests. */
private Collection<DynamicCacheChangeRequest> reqs;
+ /** */
+ private CacheAffinityChangeMessage affChangeMsg;
+
/** Cache validation results. */
private volatile Map<Integer, Boolean> cacheValidRes;
@@ -197,6 +182,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
/** Init timestamp. Used to track the amount of time spent to complete the
future. */
private long initTs;
+ /** */
+ private boolean centralizedAff;
+
/**
* Dummy future created to trigger reassignments if partition
* topology changed while preloading.
@@ -250,12 +238,14 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
* @param busyLock Busy lock.
* @param exchId Exchange ID.
* @param reqs Cache change requests.
+ * @param affChangeMsg Affinity change message.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
- Collection<DynamicCacheChangeRequest> reqs
+ Collection<DynamicCacheChangeRequest> reqs,
+ CacheAffinityChangeMessage affChangeMsg
) {
assert busyLock != null;
assert exchId != null;
@@ -268,6 +258,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
this.busyLock = busyLock;
this.exchId = exchId;
this.reqs = reqs;
+ this.affChangeMsg = affChangeMsg;
log = cctx.logger(getClass());
@@ -284,6 +275,13 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
this.reqs = reqs;
}
+ /**
+ * @param affChangeMsg Affinity change message.
+ */
+ public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg)
{
+ this.affChangeMsg = affChangeMsg;
+ }
+
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return exchId.topologyVersion();
@@ -342,7 +340,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
- private boolean cacheStarted(int cacheId) {
+ public boolean cacheStarted(int cacheId) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
@@ -356,87 +354,6 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
}
/**
- * @param cacheId Cache ID.
- * @return {@code True} if local client has been added.
- */
- public boolean isLocalClientAdded(int cacheId) {
- if (!F.isEmpty(reqs)) {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.start() && F.eq(req.initiatingNodeId(),
cctx.localNodeId())) {
- if (CU.cacheId(req.cacheName()) == cacheId)
- return true;
- }
- }
- }
-
- return false;
- }
-
- /**
- * @param cacheCtx Cache context.
- * @throws IgniteCheckedException If failed.
- */
- private void initTopology(GridCacheContext cacheCtx) throws
IgniteCheckedException {
- if (stopping(cacheCtx.cacheId()))
- return;
-
- if (canCalculateAffinity(cacheCtx)) {
- if (log.isDebugEnabled())
- log.debug("Will recalculate affinity [locNodeId=" +
cctx.localNodeId() + ", exchId=" + exchId + ']');
-
- cacheCtx.affinity().calculateAffinity(exchId.topologyVersion(),
discoEvt);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Will request affinity from remote node [locNodeId="
+ cctx.localNodeId() + ", exchId=" +
- exchId + ']');
-
- // Fetch affinity assignment from remote node.
- GridDhtAssignmentFetchFuture fetchFut = new
GridDhtAssignmentFetchFuture(cacheCtx,
- exchId.topologyVersion(),
- CU.affinityNodes(cacheCtx, exchId.topologyVersion()));
-
- fetchFut.init();
-
- List<List<ClusterNode>> affAssignment = fetchFut.get();
-
- if (log.isDebugEnabled())
- log.debug("Fetched affinity from remote node, initializing
affinity assignment [locNodeId=" +
- cctx.localNodeId() + ", topVer=" +
exchId.topologyVersion() + ']');
-
- if (affAssignment == null) {
- affAssignment = new
ArrayList<>(cacheCtx.affinity().partitions());
-
- List<ClusterNode> empty = Collections.emptyList();
-
- for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
- affAssignment.add(empty);
- }
-
- cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(),
affAssignment);
- }
- }
-
- /**
- * @param cacheCtx Cache context.
- * @return {@code True} if local node can calculate affinity on it's own
for this partition map exchange.
- */
- private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
- AffinityFunction affFunc = cacheCtx.config().getAffinity();
-
- // Do not request affinity from remote nodes if affinity function is
not centralized.
- if (!U.hasAnnotation(affFunc, AffinityCentralizedFunction.class))
- return true;
-
- // If local node did not initiate exchange or local node is the only
cache node in grid.
- Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx,
exchId.topologyVersion());
-
- return cacheStarted(cacheCtx.cacheId()) ||
- !exchId.nodeId().equals(cctx.localNodeId()) ||
- (affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
- }
-
- /**
* @return {@code True}
*/
public boolean onAdded() {
@@ -500,427 +417,469 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
if (isDone())
return;
- if (init.compareAndSet(false, true)) {
- if (isDone())
- return;
+ initTs = U.currentTimeMillis();
- initTs = U.currentTimeMillis();
+ U.await(evtLatch);
- try {
- // Wait for event to occur to make sure that discovery
- // will return corresponding nodes.
- U.await(evtLatch);
+ assert discoEvt != null : this;
+ assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
+ assert !dummy && !forcePreload : this;
- assert discoEvt != null : this;
- assert !dummy && !forcePreload : this;
+ try {
+ srvNodes = new
ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx,
exchId.topologyVersion());
+ remaining.addAll(F.nodeIds(F.view(srvNodes,
F.remoteNodes(cctx.localNodeId()))));
- oldestNode.set(oldest);
+ crd = srvNodes.isEmpty() ? null : srvNodes.get(0);
- if (!F.isEmpty(reqs))
- blockGateways();
+ boolean crdNode = crd != null && crd.isLocal();
- startCaches();
+ skipPreload = cctx.kernalContext().clientNode();
- // True if client node joined or failed.
- boolean clientNodeEvt;
+ ExchangeType exchange;
- if (F.isEmpty(reqs)) {
- int type = discoEvt.type();
+ Collection<DynamicCacheDescriptor> receivedCaches;
- assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT ||
type == EVT_NODE_FAILED : discoEvt;
+ if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+ if (!F.isEmpty(reqs))
+ exchange = onCacheChangeRequest(crdNode);
+ else {
+ assert affChangeMsg != null : this;
- clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+ exchange = onAffinityChangeRequest(crdNode);
}
- else {
- assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT :
discoEvt;
+ }
+ else {
+ if (discoEvt.type() == EVT_NODE_JOINED) {
+ receivedCaches =
cctx.cache().startReceivedCaches(topologyVersion());
- boolean clientOnlyCacheEvt = true;
+ if (!discoEvt.eventNode().isLocal())
+ cctx.affinity().initStartedCaches(crdNode, this,
receivedCaches);
+ }
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.clientStartOnly() || req.close())
- continue;
+ if (CU.clientNode(discoEvt.eventNode()))
+ exchange = onClientNodeEvent(crdNode);
+ else
+ exchange = onServerNodeEvent(crdNode);
+ }
- clientOnlyCacheEvt = false;
+ updateTopologies(crdNode);
- break;
- }
+ switch (exchange) {
+ case ALL: {
+ distributedExchange();
- clientNodeEvt = clientOnlyCacheEvt;
+ break;
}
- if (clientNodeEvt) {
- ClusterNode node = discoEvt.eventNode();
+ case CLIENT: {
+ initTopologies();
- // Client need to initialize affinity for local join event
or for stated client caches.
- if (!node.isLocal() || clientCacheClose()) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts())
{
- if (cacheCtx.isLocal())
- continue;
+ clientOnlyExchange();
- GridDhtPartitionTopology top = cacheCtx.topology();
+ break;
+ }
- top.updateTopologyVersion(exchId, this, -1,
stopping(cacheCtx.cacheId()));
+ case NONE: {
+ initTopologies();
- if (cacheCtx.affinity().affinityTopologyVersion()
== AffinityTopologyVersion.NONE) {
- initTopology(cacheCtx);
+ onDone(topologyVersion());
- top.beforeExchange(this);
- }
- else
-
cacheCtx.affinity().clientEventTopologyChange(discoEvt,
exchId.topologyVersion());
+ break;
+ }
- if (!exchId.isJoined())
- cacheCtx.preloader().unwindUndeploys();
- }
+ default:
+ assert false;
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ onDone(e);
- if (exchId.isLeft())
-
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+ throw e;
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed to reinitialize local partitions (preloading
will be stopped): " + exchId, e);
- rmtIds = Collections.emptyList();
- rmtNodes = Collections.emptyList();
+ onDone(e);
- onDone(exchId.topologyVersion());
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
- skipPreload = cctx.kernalContext().clientNode();
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void initTopologies() throws IgniteCheckedException {
+ if (crd != null) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- return;
- }
- }
+ cacheCtx.topology().beforeExchange(this, !centralizedAff);
+ }
+ }
+ }
- clientOnlyExchange = clientNodeEvt ||
cctx.kernalContext().clientNode();
+ /**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void updateTopologies(boolean crd) throws IgniteCheckedException {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- if (clientOnlyExchange) {
- skipPreload = cctx.kernalContext().clientNode();
+ GridClientPartitionTopology clientTop =
cctx.exchange().clearClientTopology(cacheCtx.cacheId());
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ long updSeq = clientTop == null ? -1 :
clientTop.lastUpdateSequence();
- GridDhtPartitionTopology top = cacheCtx.topology();
+ GridDhtPartitionTopology top = cacheCtx.topology();
- top.updateTopologyVersion(exchId, this, -1,
stopping(cacheCtx.cacheId()));
- }
+ if (crd) {
+ boolean updateTop = !cacheCtx.isLocal() &&
+
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ if (updateTop && clientTop != null)
+ cacheCtx.topology().update(exchId,
clientTop.partitionMap(true), clientTop.updateCounters());
+ }
- initTopology(cacheCtx);
- }
+ top.updateTopologyVersion(exchId, this, updSeq,
stopping(cacheCtx.cacheId()));
+ }
- if (oldest != null) {
- rmtNodes = new
ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
- exchId.topologyVersion()));
+ for (GridClientPartitionTopology top :
cctx.exchange().clientTopologies())
+ top.updateTopologyVersion(exchId, this, -1,
stopping(top.cacheId()));
+ }
- rmtIds = Collections.unmodifiableSet(new
HashSet<>(F.nodeIds(rmtNodes)));
+ /**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Exchange type.
+ */
+ private ExchangeType onCacheChangeRequest(boolean crd) throws
IgniteCheckedException {
+ assert !F.isEmpty(reqs) : this;
- initFut.onDone(true);
+ boolean clientOnly = cctx.affinity().onCacheChangeRequest(this, crd,
reqs);
- if (log.isDebugEnabled())
- log.debug("Initialized future: " + this);
+ if (clientOnly) {
+ boolean clientCacheStarted = false;
- if (cctx.localNode().equals(oldest)) {
- for (GridCacheContext cacheCtx :
cctx.cacheContexts()) {
- boolean updateTop = !cacheCtx.isLocal() &&
-
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (req.start() && req.clientStartOnly() &&
req.initiatingNodeId().equals(cctx.localNodeId())) {
+ clientCacheStarted = true;
- if (updateTop) {
- for (GridClientPartitionTopology top :
cctx.exchange().clientTopologies()) {
- if (top.cacheId() ==
cacheCtx.cacheId()) {
- cacheCtx.topology().update(exchId,
- top.partitionMap(true),
- top.updateCounters());
+ break;
+ }
+ }
- break;
- }
- }
+ if (clientCacheStarted)
+ return ExchangeType.CLIENT;
+ else
+ return ExchangeType.NONE;
+ }
+ else
+ return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT :
ExchangeType.ALL;
+ }
- }
- }
+ /**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Exchange type.
+ */
+ private ExchangeType onAffinityChangeRequest(boolean crd) throws
IgniteCheckedException {
+ assert affChangeMsg != null : this;
- onDone(exchId.topologyVersion());
- }
- else
- sendPartitions(oldest);
- }
- else {
- rmtIds = Collections.emptyList();
- rmtNodes = Collections.emptyList();
+ cctx.affinity().onChangeAffinityMessage(this, crd, affChangeMsg);
- onDone(exchId.topologyVersion());
- }
+ if (cctx.kernalContext().clientNode())
+ return ExchangeType.CLIENT;
- return;
- }
+ return ExchangeType.ALL;
+ }
- assert oldestNode.get() != null;
+ /**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Exchange type.
+ */
+ private ExchangeType onClientNodeEvent(boolean crd) throws
IgniteCheckedException {
+ assert CU.clientNode(discoEvt.eventNode()) : this;
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (isCacheAdded(cacheCtx.cacheId(),
exchId.topologyVersion())) {
- if
(cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(),
topologyVersion()).isEmpty())
- U.quietAndWarn(log, "No server nodes found for
cache client: " + cacheCtx.namex());
- }
+ if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() ==
EVT_NODE_FAILED) {
+ onLeft();
- cacheCtx.preloader().onExchangeFutureAdded();
- }
+ assert !discoEvt.eventNode().isLocal() : discoEvt;
+ }
+ else
+ assert discoEvt.type() == EVT_NODE_JOINED : discoEvt;
- List<String> cachesWithoutNodes = null;
-
- if (exchId.isLeft()) {
- for (String name : cctx.cache().cacheNames()) {
- if (cctx.discovery().cacheAffinityNodes(name,
topologyVersion()).isEmpty()) {
- if (cachesWithoutNodes == null)
- cachesWithoutNodes = new ArrayList<>();
-
- cachesWithoutNodes.add(name);
-
- // Fire event even if there is no client cache
started.
- if
(cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
- Event evt = new CacheEvent(
- name,
- cctx.localNode(),
- cctx.localNode(),
- "All server nodes have left the cluster.",
- EventType.EVT_CACHE_NODES_LEFT,
- 0,
- false,
- null,
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- null,
- null
- );
-
- cctx.gridEvents().record(evt);
- }
- }
- }
- }
+ cctx.affinity().onClientEvent(this, crd);
- if (cachesWithoutNodes != null) {
- StringBuilder sb =
- new StringBuilder("All server nodes for the following
caches have left the cluster: ");
+ if (discoEvt.eventNode().isLocal())
+ return ExchangeType.CLIENT;
+ else
+ return ExchangeType.NONE;
+ }
- for (int i = 0; i < cachesWithoutNodes.size(); i++) {
- String cache = cachesWithoutNodes.get(i);
+ /**
+ * @param crd Coordinator flag.
+ * @throws IgniteCheckedException If failed.
+ * @return Exchange type.
+ */
+ private ExchangeType onServerNodeEvent(boolean crd) throws
IgniteCheckedException {
+ assert !CU.clientNode(discoEvt.eventNode()) : this;
- sb.append('\'').append(cache).append('\'');
+ if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() ==
EVT_NODE_FAILED) {
+ onLeft();
- if (i != cachesWithoutNodes.size() - 1)
- sb.append(", ");
- }
+ warnNoAffinityNodes();
- U.quietAndWarn(log, sb.toString());
+ centralizedAff = cctx.affinity().onServerLeft(this);
+ }
+ else {
+ assert discoEvt.type() == EVT_NODE_JOINED : discoEvt;
- U.quietAndWarn(log, "Must have server nodes for caches to
operate.");
- }
+ cctx.affinity().onServerJoin(this, crd);
+ }
- assert discoEvt != null;
+ if (cctx.kernalContext().clientNode())
+ return ExchangeType.CLIENT;
+ else
+ return ExchangeType.ALL;
+ }
- assert exchId.nodeId().equals(discoEvt.eventNode().id());
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void clientOnlyExchange() throws IgniteCheckedException {
+ clientOnlyExchange = true;
+ if (crd != null) {
+ if (crd.isLocal()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- GridClientPartitionTopology clientTop =
cctx.exchange().clearClientTopology(
- cacheCtx.cacheId());
+ boolean updateTop = !cacheCtx.isLocal() &&
+
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
- long updSeq = clientTop == null ? -1 :
clientTop.lastUpdateSequence();
+ if (updateTop) {
+ for (GridClientPartitionTopology top :
cctx.exchange().clientTopologies()) {
+ if (top.cacheId() == cacheCtx.cacheId()) {
+ cacheCtx.topology().update(exchId,
+ top.partitionMap(true),
+ top.updateCounters());
- // Update before waiting for locks.
- if (!cacheCtx.isLocal())
- cacheCtx.topology().updateTopologyVersion(exchId,
this, updSeq, stopping(cacheCtx.cacheId()));
+ break;
+ }
+ }
+ }
}
+ }
+ else {
+ if (!centralizedAff)
+ sendLocalPartitions(crd, exchId);
- // Grab all alive remote nodes with order of equal or less
than last joined node.
- rmtNodes = new
ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
- exchId.topologyVersion()));
-
- rmtIds = Collections.unmodifiableSet(new
HashSet<>(F.nodeIds(rmtNodes)));
-
- for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m :
singleMsgs.entrySet())
- // If received any messages, process them.
- onReceive(m.getKey(), m.getValue());
-
- for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m :
fullMsgs.entrySet())
- // If received any messages, process them.
- onReceive(m.getKey(), m.getValue());
-
- AffinityTopologyVersion topVer = exchId.topologyVersion();
+ initDone();
+ return;
+ }
+ }
+ else {
+ if (centralizedAff) { // Last server node failed.
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ GridAffinityAssignmentCache aff =
cacheCtx.affinity().affinityCache();
- // Must initialize topology after we get discovery event.
- initTopology(cacheCtx);
+ aff.initialize(topologyVersion(), aff.idealAssignment());
+ }
+ }
+ }
-
cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion());
+ onDone(topologyVersion());
+ }
- cacheCtx.preloader().updateLastExchangeFuture(this);
- }
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void distributedExchange() throws IgniteCheckedException {
+ assert crd != null;
- IgniteInternalFuture<?> partReleaseFut =
cctx.partitionReleaseFuture(topVer);
+ assert !cctx.kernalContext().clientNode();
- // Assign to class variable so it will be included into
toString() method.
- this.partReleaseFut = partReleaseFut;
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- if (exchId.isLeft())
- cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(),
exchId.topologyVersion());
+ cacheCtx.preloader().onTopologyChanged(this);
+ }
- if (log.isDebugEnabled())
- log.debug("Before waiting for partition release future: "
+ this);
+ waitPartitionRelease();
- int dumpedObjects = 0;
+ boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT ||
affChangeMsg != null;
- while (true) {
- try {
- partReleaseFut.get(2 *
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
+ continue;
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- // Print pending transactions and locks that might
have led to hang.
- if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
- dumpPendingObjects();
+ if (topChanged) {
+
cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
- dumpedObjects++;
- }
- }
- }
+ // Partition release future is done so we can flush the
write-behind store.
+ cacheCtx.store().forceFlush();
+ }
- if (log.isDebugEnabled())
- log.debug("After waiting for partition release future: " +
this);
+ cacheCtx.topology().beforeExchange(this, !centralizedAff);
+ }
- IgniteInternalFuture<?> locksFut =
cctx.mvcc().finishLocks(exchId.topologyVersion());
+ if (crd.isLocal()) {
+ if (remaining.isEmpty())
+ onAllReceived(false);
+ }
+ else
+ sendPartitions(crd);
- dumpedObjects = 0;
+ initDone();
+ }
- while (true) {
- try {
- locksFut.get(2 *
cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void waitPartitionRelease() throws IgniteCheckedException {
+ IgniteInternalFuture<?> partReleaseFut =
cctx.partitionReleaseFuture(topologyVersion());
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
- U.warn(log, "Failed to wait for locks release
future. " +
- "Dumping pending objects that might be the
cause: " + cctx.localNodeId());
+ // Assign to class variable so it will be included into toString()
method.
+ this.partReleaseFut = partReleaseFut;
- U.warn(log, "Locked keys:");
+ if (exchId.isLeft())
+ cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(),
exchId.topologyVersion());
- for (IgniteTxKey key : cctx.mvcc().lockedKeys())
- U.warn(log, "Locked key: " + key);
+ if (log.isDebugEnabled())
+ log.debug("Before waiting for partition release future: " + this);
- for (IgniteTxKey key :
cctx.mvcc().nearLockedKeys())
- U.warn(log, "Locked near key: " + key);
+ int dumpedObjects = 0;
- Map<IgniteTxKey,
Collection<GridCacheMvccCandidate>> locks =
-
cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+ while (true) {
+ try {
+ partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(),
TimeUnit.MILLISECONDS);
- for (Map.Entry<IgniteTxKey,
Collection<GridCacheMvccCandidate>> e : locks.entrySet())
- U.warn(log, "Awaited locked entry [key=" +
e.getKey() + ", mvcc=" + e.getValue() + ']');
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ // Print pending transactions and locks that might have led to
hang.
+ if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+ dumpPendingObjects();
- dumpedObjects++;
- }
- }
+ dumpedObjects++;
}
+ }
+ }
- boolean topChanged = discoEvt.type() !=
EVT_DISCOVERY_CUSTOM_EVT;
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ if (log.isDebugEnabled())
+ log.debug("After waiting for partition release future: " + this);
- // Notify replication manager.
- GridCacheContext drCacheCtx = cacheCtx.isNear() ?
cacheCtx.near().dht().context() : cacheCtx;
+ IgniteInternalFuture<?> locksFut =
cctx.mvcc().finishLocks(exchId.topologyVersion());
- if (drCacheCtx.isDrEnabled())
- drCacheCtx.dr().beforeExchange(topVer,
exchId.isLeft());
+ dumpedObjects = 0;
- if (topChanged)
-
cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+ while (true) {
+ try {
+ locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(),
TimeUnit.MILLISECONDS);
- // Partition release future is done so we can flush the
write-behind store.
- cacheCtx.store().forceFlush();
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) {
+ U.warn(log, "Failed to wait for locks release future. " +
+ "Dumping pending objects that might be the cause: " +
cctx.localNodeId());
- if (!exchId.isJoined())
- // Process queued undeploys prior to sending/spreading
map.
- cacheCtx.preloader().unwindUndeploys();
+ U.warn(log, "Locked keys:");
- GridDhtPartitionTopology top = cacheCtx.topology();
+ for (IgniteTxKey key : cctx.mvcc().lockedKeys())
+ U.warn(log, "Locked key: " + key);
- assert topVer.equals(top.topologyVersion()) :
- "Topology version is updated only in this class
instances inside single ExchangeWorker thread.";
+ for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
+ U.warn(log, "Locked near key: " + key);
- top.beforeExchange(this);
- }
+ Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks
=
+ cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
- for (GridClientPartitionTopology top :
cctx.exchange().clientTopologies()) {
- top.updateTopologyVersion(exchId, this, -1,
stopping(top.cacheId()));
+ for (Map.Entry<IgniteTxKey,
Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+ U.warn(log, "Awaited locked entry [key=" + e.getKey()
+ ", mvcc=" + e.getValue() + ']');
- top.beforeExchange(this);
+ dumpedObjects++;
}
}
- catch (IgniteInterruptedCheckedException e) {
- onDone(e);
-
- throw e;
- }
- catch (Throwable e) {
- U.error(log, "Failed to reinitialize local partitions
(preloading will be stopped): " + exchId, e);
-
- onDone(e);
+ }
+ }
- if (e instanceof Error)
- throw (Error)e;
+ /**
+ *
+ */
+ private void onLeft() {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- return;
- }
+ cacheCtx.preloader().unwindUndeploys();
+ }
- if (F.isEmpty(rmtIds)) {
- onDone(exchId.topologyVersion());
+ cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(),
exchId.topologyVersion());
+ }
- return;
+ /**
+ *
+ */
+ private void warnNoAffinityNodes() {
+ List<String> cachesWithoutNodes = null;
+
+ for (String name : cctx.cache().cacheNames()) {
+ if (cctx.discovery().cacheAffinityNodes(name,
topologyVersion()).isEmpty()) {
+ if (cachesWithoutNodes == null)
+ cachesWithoutNodes = new ArrayList<>();
+
+ cachesWithoutNodes.add(name);
+
+ // Fire event even if there is no client cache started.
+ if
(cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) {
+ Event evt = new CacheEvent(
+ name,
+ cctx.localNode(),
+ cctx.localNode(),
+ "All server nodes have left the cluster.",
+ EventType.EVT_CACHE_NODES_LEFT,
+ 0,
+ false,
+ null,
+ null,
+ null,
+ null,
+ false,
+ null,
+ false,
+ null,
+ null,
+ null
+ );
+
+ cctx.gridEvents().record(evt);
+ }
}
+ }
- ready.set(true);
-
- initFut.onDone(true);
-
- if (log.isDebugEnabled())
- log.debug("Initialized future: " + this);
+ if (cachesWithoutNodes != null) {
+ StringBuilder sb =
+ new StringBuilder("All server nodes for the following caches
have left the cluster: ");
- ClusterNode oldest = oldestNode.get();
+ for (int i = 0; i < cachesWithoutNodes.size(); i++) {
+ String cache = cachesWithoutNodes.get(i);
- // If this node is not oldest.
- if (!oldest.id().equals(cctx.localNodeId()))
- sendPartitions(oldest);
- else {
- boolean allReceived = allReceived();
+ sb.append('\'').append(cache).append('\'');
- if (allReceived && replied.compareAndSet(false, true)) {
- if (spreadPartitions())
- onDone(exchId.topologyVersion());
- }
+ if (i != cachesWithoutNodes.size() - 1)
+ sb.append(", ");
}
- scheduleRecheck();
- }
- else
- assert false : "Skipped init future: " + this;
- }
+ U.quietAndWarn(log, sb.toString());
- /**
- * @return {@code True} if exchange initiated for client cache close.
- */
- private boolean clientCacheClose() {
- return reqs != null && reqs.size() == 1 &&
reqs.iterator().next().close();
+ U.quietAndWarn(log, "Must have server nodes for caches to
operate.");
+ }
}
/**
@@ -930,14 +889,14 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
U.warn(log, "Failed to wait for partition release future [topVer=" +
topologyVersion() +
", node=" + cctx.localNodeId() + "]. Dumping pending objects that
might be the cause: ");
- cctx.exchange().dumpPendingObjects();
+ cctx.exchange().dumpPendingObjects(topologyVersion());
}
/**
* @param cacheId Cache ID to check.
* @return {@code True} if cache is stopping by this exchange.
*/
- private boolean stopping(int cacheId) {
+ public boolean stopping(int cacheId) {
boolean stopping = false;
if (!F.isEmpty(reqs)) {
@@ -954,30 +913,8 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
}
/**
- * Starts dynamic caches.
- * @throws IgniteCheckedException If failed.
- */
- private void startCaches() throws IgniteCheckedException {
- cctx.cache().prepareCachesStart(F.view(reqs, new
IgnitePredicate<DynamicCacheChangeRequest>() {
- @Override public boolean apply(DynamicCacheChangeRequest req) {
- return req.start();
- }
- }), exchId.topologyVersion());
- }
-
- /**
- *
- */
- private void blockGateways() {
- for (DynamicCacheChangeRequest req : reqs) {
- if (req.stop() || req.close())
- cctx.cache().blockGateway(req);
- }
- }
-
- /**
- * @param node Node.
- * @param id ID.
+ * @param node Node.
+ * @param id ID.
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node, @Nullable
GridDhtPartitionExchangeId id)
@@ -1002,32 +939,40 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Sending local partitions [nodeId=" + node.id() + ",
exchId=" + exchId + ", msg=" + m + ']');
- cctx.io().send(node, m, SYSTEM_POOL);
+ try {
+ cctx.io().send(node, m, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during partition exchange [nodeId=" +
node.id() + ", exchId=" + exchId + ']');
+ }
}
/**
- * @param nodes Nodes.
- * @param id ID.
- * @throws IgniteCheckedException If failed.
+ * @param nodes Target nodes.
+ * @return Message;
*/
- private void sendAllPartitions(Collection<? extends ClusterNode> nodes,
GridDhtPartitionExchangeId id)
- throws IgniteCheckedException {
- GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(id,
- lastVer.get(),
- id.topologyVersion());
+ private GridDhtPartitionsFullMessage
createPartitionsMessage(Collection<ClusterNode> nodes) {
+ GridCacheVersion last = lastVer.get();
+
+ GridDhtPartitionsFullMessage m = new
GridDhtPartitionsFullMessage(exchangeId(),
+ last != null ? last : cctx.versions().last(),
+ topologyVersion());
boolean useOldApi = false;
- for (ClusterNode node : nodes) {
- if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
- useOldApi = true;
+ if (nodes != null) {
+ for (ClusterNode node : nodes) {
+ if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+ useOldApi = true;
+ }
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
AffinityTopologyVersion startTopVer =
cacheCtx.startTopologyVersion();
- boolean ready = startTopVer == null ||
startTopVer.compareTo(id.topologyVersion()) <= 0;
+ boolean ready = startTopVer == null ||
startTopVer.compareTo(topologyVersion()) <= 0;
if (ready) {
GridDhtPartitionFullMap locMap =
cacheCtx.topology().partitionMap(true);
@@ -1049,6 +994,16 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
}
+ return m;
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void sendAllPartitions(Collection<ClusterNode> nodes) throws
IgniteCheckedException {
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+
if (log.isDebugEnabled())
log.debug("Sending full partition map [nodeIds=" +
F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + m + ']');
@@ -1069,65 +1024,65 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
", exchId=" + exchId + ']');
}
catch (IgniteCheckedException e) {
- scheduleRecheck();
-
U.error(log, "Failed to send local partitions to oldest node (will
retry after timeout) [oldestNodeId=" +
oldestNode.id() + ", exchId=" + exchId + ']', e);
}
}
- /**
- * @return {@code True} if succeeded.
- */
- private boolean spreadPartitions() {
- try {
- sendAllPartitions(rmtNodes, exchId);
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable AffinityTopologyVersion res,
@Nullable Throwable err) {
+ boolean realExchange = !dummy && !forcePreload;
- return true;
- }
- catch (IgniteCheckedException e) {
- scheduleRecheck();
+ if (err == null && realExchange) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- if (!X.hasCause(e, InterruptedException.class))
- U.error(log, "Failed to send full partition map to nodes (will
retry after timeout) [nodes=" +
- F.nodeId8s(rmtNodes) + ", exchangeId=" + exchId + ']', e);
+ try {
+ if (centralizedAff)
+ cacheCtx.topology().initPartitions(this);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.error(log, "Failed to initialize partitions.", e);
+ }
- return false;
- }
- }
+ GridCacheContext drCacheCtx = cacheCtx.isNear() ?
cacheCtx.near().dht().context() : cacheCtx;
- /** {@inheritDoc} */
- @Override public boolean onDone(AffinityTopologyVersion res, Throwable
err) {
- Map<Integer, Boolean> m = null;
+ if (drCacheCtx.isDrEnabled()) {
+ try {
+ drCacheCtx.dr().onExchange(topologyVersion(),
exchId.isLeft());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to notify DR: " + e, e);
+ }
+ }
+ }
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.config().getTopologyValidator() != null &&
!CU.isSystemCache(cacheCtx.name())) {
- if (m == null)
- m = new HashMap<>();
+ Map<Integer, Boolean> m = null;
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.config().getTopologyValidator() != null &&
!CU.isSystemCache(cacheCtx.name())) {
+ if (m == null)
+ m = new HashMap<>();
- m.put(cacheCtx.cacheId(),
cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+ m.put(cacheCtx.cacheId(),
cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+ }
}
- }
- cacheValidRes = m != null ? m : Collections.<Integer,
Boolean>emptyMap();
+ cacheValidRes = m != null ? m : Collections.<Integer,
Boolean>emptyMap();
+ }
cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
cctx.exchange().onExchangeDone(this, err);
- if (super.onDone(res, err) && !dummy && !forcePreload) {
+ if (super.onDone(res, err) && realExchange) {
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" +
cctx.localNodeId() + ", exchange= " + this +
"duration=" + duration() + ", durationFromInit=" +
(U.currentTimeMillis() - initTs) + ']');
initFut.onDone(err == null);
- GridTimeoutObject timeoutObj = this.timeoutObj;
-
- // Deschedule timeout object.
- if (timeoutObj != null)
- cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj);
-
if (exchId.isLeft()) {
for (GridCacheContext cacheCtx : cctx.cacheContexts())
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
@@ -1170,56 +1125,47 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
topSnapshot.set(null);
singleMsgs.clear();
fullMsgs.clear();
- rcvdIds.clear();
- oldestNode.set(null);
+ crd = null;
partReleaseFut = null;
-
- Collection<ClusterNode> rmtNodes = this.rmtNodes;
-
- if (rmtNodes != null)
- rmtNodes.clear();
}
/**
- * @return {@code True} if all replies are received.
+ * @param ver Version.
*/
- private boolean allReceived() {
- Collection<UUID> rmtIds = this.rmtIds;
+ private void updateLastVersion(GridCacheVersion ver) {
+ assert ver != null;
- assert rmtIds != null : "Remote Ids can't be null: " + this;
+ while (true) {
+ GridCacheVersion old = lastVer.get();
- synchronized (rcvdIds) {
- return rcvdIds.containsAll(rmtIds);
+ if (old == null || Long.compare(old.order(), ver.order()) < 0) {
+ if (lastVer.compareAndSet(old, ver))
+ break;
+ }
+ else
+ break;
}
}
/**
- * @param nodeId Sender node id.
+ * @param node Sender node.
* @param msg Single partition info.
*/
- public void onReceive(final UUID nodeId, final
GridDhtPartitionsSingleMessage msg) {
+ public void onReceive(final ClusterNode node, final
GridDhtPartitionsSingleMessage msg) {
assert msg != null;
+ assert msg.exchangeId().equals(exchId) : msg;
+ assert msg.lastVersion() != null : msg;
- assert msg.exchangeId().equals(exchId);
-
- // Update last seen version.
- while (true) {
- GridCacheVersion old = lastVer.get();
-
- if (old == null || old.compareTo(msg.lastVersion()) < 0) {
- if (lastVer.compareAndSet(old, msg.lastVersion()))
- break;
- }
- else
- break;
- }
+ if (!msg.client())
+ updateLastVersion(msg.lastVersion());
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Received message for finished future (will reply
only to sender) [msg=" + msg +
", fut=" + this + ']');
- sendAllPartitions(nodeId,
cctx.gridConfig().getNetworkSendRetryCount());
+ if (!centralizedAff)
+ sendAllPartitions(node.id(),
cctx.gridConfig().getNetworkSendRetryCount());
}
else {
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@@ -1234,46 +1180,108 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
return;
}
- ClusterNode loc = cctx.localNode();
+ processMessage(node, msg);
+ }
+ });
+ }
+ }
- singleMsgs.put(nodeId, msg);
+ /**
+ * @param node Sender node.
+ * @param msg Message.
+ */
+ private void processMessage(ClusterNode node,
GridDhtPartitionsSingleMessage msg) {
+ boolean allReceived = false;
- boolean match = true;
+ synchronized (mux) {
+ assert crd != null;
- // Check if oldest node has changed.
- if (!oldestNode.get().equals(loc)) {
- match = false;
+ if (crd.isLocal()) {
+ if (remaining.remove(node.id())) {
+ updatePartitionSingleMap(msg);
- synchronized (mux) {
- // Double check.
- if (oldestNode.get().equals(loc))
- match = true;
- }
- }
+ allReceived = remaining.isEmpty();
+ }
+ }
+ else
+ singleMsgs.put(node, msg);
+ }
+
+ if (allReceived)
+ onAllReceived(false);
+ }
- if (match) {
- boolean allReceived;
+ /**
+ * @param fut Affinity future.
+ */
+ private void onAffinityInitialized(IgniteInternalFuture<Map<Integer,
Map<Integer, List<UUID>>>> fut) {
+ try {
+ assert fut.isDone();
- synchronized (rcvdIds) {
- if (rcvdIds.add(nodeId))
- updatePartitionSingleMap(msg);
+ Map<Integer, Map<Integer, List<UUID>>> assignmentChange =
fut.get();
- allReceived = allReceived();
- }
+ GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
- // If got all replies, and initialization finished,
and reply has not been sent yet.
- if (allReceived && ready.get() &&
replied.compareAndSet(false, true)) {
- spreadPartitions();
+ CacheAffinityChangeMessage msg = new
CacheAffinityChangeMessage(exchId, m, assignmentChange);
- onDone(exchId.topologyVersion());
+ if (log.isDebugEnabled())
+ log.debug("Centralized affinity exchange, send affinity change
message: " + msg);
+
+ cctx.discovery().sendCustomEvent(msg);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
+ /**
+ * @param discoThread If {@code true} completes future from another thread
(to do not block discovery thread).
+ */
+ private void onAllReceived(boolean discoThread) {
+ try {
+ assert crd.isLocal();
+
+ if
(!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.isLocal())
+
cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this,
!centralizedAff);
+ }
+ }
+
+ updateLastVersion(cctx.versions().last());
+
+ cctx.versions().onExchange(lastVer.get().order());
+
+ if (centralizedAff) {
+ IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>>
fut = cctx.affinity().initAffinityOnNodeLeft(this);
+
+ if (!fut.isDone()) {
+ fut.listen(new
IgniteInClosure<IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>>>()
{
+ @Override public void
apply(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
+ onAffinityInitialized(fut);
}
- else if (log.isDebugEnabled())
- log.debug("Exchange future full map is not sent
[allReceived=" + allReceived() +
- ", ready=" + ready + ", replied=" +
replied.get() + ", init=" + init.get() +
- ", fut=" +
GridDhtPartitionsExchangeFuture.this + ']');
- }
+ });
}
- });
+ else
+ onAffinityInitialized(fut);
+ }
+ else {
+ List<ClusterNode> nodes;
+
+ synchronized (mux) {
+ srvNodes.remove(cctx.localNode());
+
+ nodes = new ArrayList<>(srvNodes);
+ }
+
+ if (!nodes.isEmpty())
+ sendAllPartitions(nodes);
+
+ onDone(exchangeId().topologyVersion());
+ }
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
}
}
@@ -1286,7 +1294,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
try {
if (n != null)
- sendAllPartitions(F.asList(n), exchId);
+ sendAllPartitions(F.asList(n));
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException ||
!cctx.discovery().alive(n)) {
@@ -1314,12 +1322,14 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
}
/**
- * @param nodeId Sender node ID.
+ * @param node Sender node.
* @param msg Full partition info.
*/
- public void onReceive(final UUID nodeId, final
GridDhtPartitionsFullMessage msg) {
+ public void onReceive(final ClusterNode node, final
GridDhtPartitionsFullMessage msg) {
assert msg != null;
+ final UUID nodeId = node.id();
+
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Received message for finished future [msg=" + msg +
", fut=" + this + ']');
@@ -1330,8 +1340,6 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
if (log.isDebugEnabled())
log.debug("Received full partition map from node [nodeId=" +
nodeId + ", msg=" + msg + ']');
- assert exchId.topologyVersion().equals(msg.topologyVersion());
-
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> f) {
try {
@@ -1344,41 +1352,38 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
return;
}
- ClusterNode curOldest = oldestNode.get();
-
- if (!nodeId.equals(curOldest.id())) {
- if (log.isDebugEnabled())
- log.debug("Received full partition map from unexpected
node [oldest=" + curOldest.id() +
- ", unexpectedNodeId=" + nodeId + ']');
-
- ClusterNode snd = cctx.discovery().node(nodeId);
-
- if (snd == null) {
- if (log.isDebugEnabled())
- log.debug("Sender node left grid, will ignore
message from unexpected node [nodeId=" + nodeId +
- ", exchId=" + msg.exchangeId() + ']');
-
- return;
- }
+ processMessage(node, msg);
+ }
+ });
+ }
- // Will process message later if sender node becomes
oldest node.
- if (snd.order() > curOldest.order())
- fullMsgs.put(nodeId, msg);
+ /**
+ * @param node Sender node.
+ * @param msg Message.
+ */
+ private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage
msg) {
+ assert msg.exchangeId().equals(exchId) : msg;
+ assert msg.lastVersion() != null : msg;
- return;
- }
+ synchronized (mux) {
+ if (crd == null)
+ return;
- assert msg.exchangeId().equals(exchId);
+ if (!crd.equals(node)) {
+ if (log.isDebugEnabled())
+ log.debug("Received full partition map from unexpected
node [oldest=" + crd.id() +
+ ", nodeId=" + node.id() + ']');
- assert msg.lastVersion() != null;
+ if (node.order() > crd.order())
+ fullMsgs.put(node, msg);
- cctx.versions().onReceived(nodeId, msg.lastVersion());
+ return;
+ }
+ }
- updatePartitionFullMap(msg);
+ updatePartitionFullMap(msg);
- onDone(exchId.topologyVersion());
- }
- });
+ onDone(exchId.topologyVersion());
}
/**
@@ -1387,6 +1392,8 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
* @param msg Partitions full messages.
*/
private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) {
+ cctx.versions().onExchange(msg.lastVersion().order());
+
for (Map.Entry<Integer, GridDhtPartitionFullMap> entry :
msg.partitions().entrySet()) {
Integer cacheId = entry.getKey();
@@ -1423,251 +1430,183 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
}
/**
- * @param nodeId Left node id.
+ * Affinity change message callback, processed from the same thread as
{@link #onNodeLeft}.
+ *
+ * @param node Message sender node.
+ * @param msg Message.
*/
- public void onNodeLeft(final UUID nodeId) {
- if (isDone())
- return;
-
- if (!enterBusy())
- return;
-
- try {
- // Wait for initialization part of this future to complete.
- initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
- @Override public void apply(IgniteInternalFuture<Boolean> f) {
- try {
- if (!f.get())
- return;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to initialize exchange future: "
+ this, e);
+ public void onAffinityChangeMessage(final ClusterNode node, final
CacheAffinityChangeMessage msg) {
+ assert exchId.equals(msg.exchangeId()) : msg;
- return;
- }
-
- if (isDone())
- return;
-
- if (!enterBusy())
- return;
+ onDiscoveryEvent(new IgniteRunnable() {
+ @Override public void run() {
+ if (isDone() || !enterBusy())
+ return;
- try {
- // Pretend to have received message from this node.
- rcvdIds.add(nodeId);
-
- Collection<UUID> rmtIds =
GridDhtPartitionsExchangeFuture.this.rmtIds;
-
- assert rmtIds != null;
-
- ClusterNode oldest = oldestNode.get();
-
- if (oldest.id().equals(nodeId)) {
- if (log.isDebugEnabled())
- log.debug("Oldest node left or failed on
partition exchange " +
- "(will restart exchange process))
[oldestNodeId=" + oldest.id() +
- ", exchangeId=" + exchId + ']');
-
- boolean set = false;
-
- ClusterNode newOldest =
CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
-
- if (newOldest != null) {
- // If local node is now oldest.
- if (newOldest.id().equals(cctx.localNodeId()))
{
- synchronized (mux) {
- if (oldestNode.compareAndSet(oldest,
newOldest)) {
- // If local node is just joining.
- if
(exchId.nodeId().equals(cctx.localNodeId())) {
- try {
- for (GridCacheContext
cacheCtx : cctx.cacheContexts()) {
- if
(!cacheCtx.isLocal())
-
cacheCtx.topology().beforeExchange(
-
GridDhtPartitionsExchangeFuture.this);
- }
- }
- catch (IgniteCheckedException
e) {
- onDone(e);
-
- return;
- }
- }
-
- set = true;
- }
- }
- }
- else {
- synchronized (mux) {
- set = oldestNode.compareAndSet(oldest,
newOldest);
- }
-
- if (set && log.isDebugEnabled())
- log.debug("Reassigned oldest node
[this=" + cctx.localNodeId() +
- ", old=" + oldest.id() + ", new="
+ newOldest.id() + ']');
- }
- }
- else {
- ClusterTopologyCheckedException err = new
ClusterTopologyCheckedException("Failed to " +
- "wait for exchange future, all server
nodes left.");
+ try {
+ assert centralizedAff;
- onDone(err);
- }
+ if (crd.equals(node)) {
+
cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this,
+ crd.isLocal(),
+ msg);
- if (set) {
- // If received any messages, process them.
- for (Map.Entry<UUID,
GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
- onReceive(m.getKey(), m.getValue());
+ if (!crd.isLocal()) {
+ GridDhtPartitionsFullMessage partsMsg =
msg.partitionsMessage();
- for (Map.Entry<UUID,
GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
- onReceive(m.getKey(), m.getValue());
+ assert partsMsg != null : msg;
+ assert partsMsg.lastVersion() != null : partsMsg;
- // Reassign oldest node and resend.
- recheck();
- }
+ updatePartitionFullMap(partsMsg);
}
- else if (rmtIds.contains(nodeId)) {
- if (log.isDebugEnabled())
- log.debug("Remote node left of failed during
partition exchange (will ignore) " +
- "[rmtNode=" + nodeId + ", exchangeId=" +
exchId + ']');
-
- assert rmtNodes != null;
-
- for (Iterator<ClusterNode> it =
rmtNodes.iterator(); it.hasNext(); ) {
- if (it.next().id().equals(nodeId))
- it.remove();
- }
- if (allReceived() && ready.get() &&
replied.compareAndSet(false, true))
- if (spreadPartitions())
- onDone(exchId.topologyVersion());
- }
+ onDone(topologyVersion());
}
- finally {
- leaveBusy();
+ else {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore affinity change message,
coordinator changed [node=" + node.id() +
+ ", crd=" + crd.id() +
+ ", msg=" + msg +
+ ']');
+ }
}
}
- });
- }
- finally {
- leaveBusy();
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+
+ /**
+ * @param c Closure.
+ */
+ private void onDiscoveryEvent(IgniteRunnable c) {
+ synchronized (discoEvts) {
+ if (!init) {
+ discoEvts.add(c);
+
+ return;
+ }
+
+ assert discoEvts.isEmpty() : discoEvts;
}
+
+ c.run();
}
/**
*
*/
- private void recheck() {
- ClusterNode oldest = oldestNode.get();
+ private void initDone() {
+ while (!isDone()) {
+ List<IgniteRunnable> evts;
- // If this is the oldest node.
- if (oldest.id().equals(cctx.localNodeId())) {
- Collection<UUID> remaining = remaining();
+ synchronized (discoEvts) {
+ if (discoEvts.isEmpty()) {
+ init = true;
- if (!remaining.isEmpty()) {
- try {
- cctx.io().safeSend(cctx.discovery().nodes(remaining),
- new GridDhtPartitionsSingleRequest(exchId),
SYSTEM_POOL, null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to request partitions from nodes
[exchangeId=" + exchId +
- ", nodes=" + remaining + ']', e);
+ break;
}
+
+ evts = new ArrayList<>(discoEvts);
+
+ discoEvts.clear();
}
- // Resend full partition map because last attempt failed.
- else {
- if (spreadPartitions())
- onDone(exchId.topologyVersion());
- }
+
+ for (IgniteRunnable c : evts)
+ c.run();
}
- else
- sendPartitions(oldest);
- // Schedule another send.
- scheduleRecheck();
+ initFut.onDone(true);
}
/**
+ * Node left callback, processed from the same thread as {@link
#onAffinityChangeMessage}.
*
+ * @param node Left node.
*/
- private void scheduleRecheck() {
- if (!isDone()) {
- GridTimeoutObject old = timeoutObj;
-
- if (old != null)
- cctx.kernalContext().timeout().removeTimeoutObject(old);
-
- GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
- cctx.gridConfig().getNetworkTimeout() * Math.max(1,
cctx.gridConfig().getCacheConfiguration().length)) {
- @Override public void onTimeout() {
- cctx.kernalContext().closure().runLocalSafe(new Runnable()
{
- @Override public void run() {
- if (isDone())
- return;
+ public void onNodeLeft(final ClusterNode node) {
+ if (isDone() || !enterBusy())
+ return;
+
+ try {
+ onDiscoveryEvent(new IgniteRunnable() {
+ @Override public void run() {
+ if (isDone() || !enterBusy())
+ return;
- if (!enterBusy())
+ try {
+ boolean crdChanged = false;
+ boolean allReceived = false;
+
+ ClusterNode crd0;
+
+ synchronized (mux) {
+ if (!srvNodes.remove(node))
return;
- try {
- U.warn(log,
- "Retrying preload partition exchange due
to timeout [done=" + isDone() +
- ", dummy=" + dummy + ", exchId=" +
exchId + ", rcvdIds=" + id8s(rcvdIds) +
- ", rmtIds=" + id8s(rmtIds) + ",
remaining=" + id8s(remaining()) +
- ", init=" + init + ", initFut=" +
initFut.isDone() +
- ", ready=" + ready + ", replied=" +
replied + ", added=" + added +
- ", oldest=" +
U.id8(oldestNode.get().id()) + ", oldestOrder=" +
- oldestNode.get().order() + ",
evtLatch=" + evtLatch.getCount() +
- ", locNodeOrder=" +
cctx.localNode().order() +
- ", locNodeId=" + cctx.localNode().id()
+ ']',
- "Retrying preload partition exchange due
to timeout.");
-
- recheck();
- }
- finally {
- leaveBusy();
+ boolean rmvd = remaining.remove(node.id());
+
+ if (node.equals(crd)) {
+ crdChanged = true;
+
+ crd = srvNodes.size() > 0 ? srvNodes.get(0) :
null;
}
+
+ if (crd != null && crd.isLocal() && rmvd)
+ allReceived = remaining.isEmpty();
+
+ crd0 = crd;
}
- });
- }
- };
- this.timeoutObj = timeoutObj;
+ if (crd0 == null) {
+ assert cctx.kernalContext().clientNode() ||
cctx.localNode().isDaemon() : cctx.localNode();
- cctx.kernalContext().timeout().addTimeoutObject(timeoutObj);
- }
- }
+ List<ClusterNode> empty = Collections.emptyList();
- /**
- * @return Remaining node IDs.
- */
- Collection<UUID> remaining() {
- if (rmtIds == null)
- return Collections.emptyList();
+ for (GridCacheContext cacheCtx :
cctx.cacheContexts()) {
+ List<List<ClusterNode>> affAssignment = new
ArrayList<>(cacheCtx.affinity().partitions());
- return F.lose(rmtIds, true, rcvdIds);
- }
+ for (int i = 0; i <
cacheCtx.affinity().partitions(); i++)
+ affAssignment.add(empty);
- /**
- * Convenient utility method that returns collection of node ID8s for a
given
- * collection of node IDs. ID8 is a shorter string representation of node
ID,
- * mainly the first 8 characters.
- * <p>
- * Note that this method doesn't create a new collection but simply
iterates
- * over the input one.
- *
- * @param ids Collection of nodeIds.
- * @return Collection of node IDs for given collection of grid nodes.
- */
- private static Collection<String> id8s(@Nullable Collection<UUID> ids) {
- if (ids == null || ids.isEmpty())
- return Collections.emptyList();
+
cacheCtx.affinity().affinityCache().initialize(topologyVersion(),
affAssignment);
+ }
+
+ onDone(topologyVersion());
+
+ return;
+ }
+
+ if (crd0.isLocal()) {
+ if (allReceived) {
+ onAllReceived(true);
- Collection<String> res = new ArrayList<>(ids.size());
+ return;
+ }
- for (UUID id : ids)
- res.add(U.id8(id));
+ for (Map.Entry<ClusterNode,
GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
+ processMessage(m.getKey(), m.getValue());
+ }
+ else {
+ if (crdChanged) {
+ sendPartitions(crd0);
- return res;
+ for (Map.Entry<ClusterNode,
GridDhtPartitionsFullMessage> m : fullMsgs.entrySet())
+ processMessage(m.getKey(), m.getValue());
+ }
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+ });
+ }
+ finally {
+ leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -1690,15 +1629,33 @@ public class GridDhtPartitionsExchangeFuture extends
GridFutureAdapter<AffinityT
return exchId.hashCode();
}
+ /**
+ *
+ */
+ enum ExchangeType {
+ /** */
+ CLIENT,
+ /** */
+ ALL,
+ /** */
+ NONE
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- ClusterNode oldestNode = this.oldestNode.get();
+ ClusterNode oldestNode;
+ Set<UUID> remaining;
+
+ synchronized (mux) {
+ oldestNode = this.crd;
+ remaining = new HashSet<>(this.remaining);
+ }
return S.toString(GridDhtPartitionsExchangeFuture.class, this,
"oldest", oldestNode == null ? "null" : oldestNode.id(),
"oldestOrder", oldestNode == null ? "null" : oldestNode.order(),
"evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
- "remaining", remaining(),
+ "remaining", remaining,
"super", super.toString());
}
}