http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a1926ee..cea758a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -57,15 +58,14 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; -import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -73,7 +73,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -214,9 +215,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Change global state exceptions. */ private final Map<UUID, Exception> changeGlobalStateExceptions = new ConcurrentHashMap8<>(); - /** This exchange for change global state. */ - private boolean exchangeOnChangeGlobalState; - /** */ private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>(); @@ -463,10 +461,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * @return {@code True} if cluster state change exchange. + */ + private boolean stateChangeExchange() { + return exchActions != null && exchActions.stateChangeRequest() != null; + } + + /** + * @return {@code True} if activate cluster exchange. */ - public ClusterState newClusterState() { - return exchActions != null ? exchActions.newClusterState() : null; + public boolean activateCluster() { + return exchActions != null && exchActions.activate(); + } + + /** + * @return {@code True} if deactivate cluster exchange. + */ + boolean deactivateCluster() { + return exchActions != null && exchActions.deactivate(); } /** @@ -519,6 +531,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (isDone()) return; + assert !cctx.kernalContext().isDaemon(); + initTs = U.currentTimeMillis(); U.await(evtLatch); @@ -557,7 +571,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage(); - if (msg instanceof DynamicCacheChangeBatch) { + if (msg instanceof ChangeGlobalStateMessage) { + assert exchActions != null && !exchActions.empty(); + + exchange = onClusterStateChangeRequest(crdNode); + } + else if (msg instanceof DynamicCacheChangeBatch) { assert exchActions != null && !exchActions.empty(); exchange = onCacheChangeRequest(crdNode); @@ -582,8 +601,26 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } - else - cctx.cache().startCachesOnLocalJoin(topVer); + else { + cctx.activate(); + + List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = + cctx.cache().cachesToStartOnLocalJoin(); + + if (cctx.database().persistenceEnabled() && + !cctx.kernalContext().clientNode()) { + List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); + + if (caches != null) { + for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches) + startDescs.add(c.get1()); + } + + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + + cctx.cache().startCachesOnLocalJoin(caches, topVer); + } } exchange = CU.clientNode(discoEvt.eventNode()) ? @@ -710,21 +747,94 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @return Exchange type. * @throws IgniteCheckedException If failed. */ - private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException { + private ExchangeType onClusterStateChangeRequest(boolean crd) throws IgniteCheckedException { assert exchActions != null && !exchActions.empty() : this; - GridClusterStateProcessor stateProc = cctx.kernalContext().state(); + StateChangeRequest req = exchActions.stateChangeRequest(); + + assert req != null : exchActions; + + if (req.activate()) { + if (log.isInfoEnabled()) { + log.info("Start activation process [nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]"); + } + + try { + cctx.activate(); + + if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { + List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); + + for (ExchangeActions.ActionData startReq : exchActions.cacheStartRequests()) + startDescs.add(startReq.descriptor()); + + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + + cctx.affinity().onCacheChangeRequest(this, crd, exchActions); - if (exchangeOnChangeGlobalState = stateProc.changeGlobalState(exchActions, topologyVersion())) { - changeGlobalStateE = stateProc.onChangeGlobalState(); + if (log.isInfoEnabled()) { + log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]"); + } + } + catch (Exception e) { + U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]", e); - if (changeGlobalStateE != null) { - if (crd) - changeGlobalStateExceptions.put(cctx.localNodeId(), changeGlobalStateE); + changeGlobalStateE = e; - return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; + if (crd) { + synchronized (this) { + changeGlobalStateExceptions.put(cctx.localNodeId(), e); + } + } } } + else { + if (log.isInfoEnabled()) { + log.info("Start deactivation process [nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]"); + } + + try { + cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext()); + + cctx.kernalContext().service().onDeActivate(cctx.kernalContext()); + + cctx.affinity().onCacheChangeRequest(this, crd, exchActions); + + if (log.isInfoEnabled()) { + log.info("Successfully deactivated data structures, services and caches [" + + "nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]"); + } + } + catch (Exception e) { + U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() + + ", client=" + cctx.kernalContext().clientNode() + + ", topVer=" + topologyVersion() + "]", e); + + changeGlobalStateE = e; + } + } + + return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL; + } + + /** + * @param crd Coordinator flag. + * @return Exchange type. + * @throws IgniteCheckedException If failed. + */ + private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException { + assert exchActions != null && !exchActions.empty() : this; assert !exchActions.clientOnlyExchange() : exchActions; @@ -1133,8 +1243,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (partHistReserved0 != null) m.partitionHistoryCounters(partHistReserved0); - if (exchangeOnChangeGlobalState && changeGlobalStateE != null) - m.setException(changeGlobalStateE); + if (stateChangeExchange() && changeGlobalStateE != null) + m.setError(changeGlobalStateE); if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -1160,8 +1270,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partHistSuppliers, partsToReload); - if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions)) - m.setExceptionsMap(changeGlobalStateExceptions); + if (stateChangeExchange() && !F.isEmpty(changeGlobalStateExceptions)) + m.setErrorsMap(changeGlobalStateExceptions); return m; } @@ -1175,9 +1285,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert !nodes.contains(cctx.localNode()); - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); + } for (ClusterNode node : nodes) { try { @@ -1291,8 +1402,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchActions != null && err == null) exchActions.completeRequestFutures(cctx); - if (exchangeOnChangeGlobalState && err == null) - cctx.kernalContext().state().onExchangeDone(); + if (stateChangeExchange() && err == null) + cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest()); Map<T2<Integer, Integer>, Long> localReserved = partHistSuppliers.getReservations(cctx.localNodeId()); @@ -1368,6 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte crd = null; partReleaseFut = null; changeGlobalStateE = null; + exchActions = null; } /** @@ -1444,8 +1556,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte pendingSingleUpdates++; - if (exchangeOnChangeGlobalState && msg.getException() != null) - changeGlobalStateExceptions.put(node.id(), msg.getException()); + if (stateChangeExchange() && msg.getError() != null) + changeGlobalStateExceptions.put(node.id(), msg.getError()); allReceived = remaining.isEmpty(); } @@ -1457,7 +1569,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (updateSingleMap) { try { // Do not update partition map, in case cluster transitioning to inactive state. - if (!exchangeOnChangeGlobalState || exchActions.newClusterState() != ClusterState.INACTIVE) + if (!deactivateCluster()) updatePartitionSingleMap(node, msg); } finally { @@ -1735,18 +1847,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (discoEvt.type() == EVT_NODE_JOINED) { - if (cctx.kernalContext().state().active()) - assignPartitionsStates(); - } + if (discoEvt.type() == EVT_NODE_JOINED) + assignPartitionsStates(); else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { assert discoEvt instanceof DiscoveryCustomEvent; + if (activateCluster()) + assignPartitionsStates(); + if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) { if (exchActions != null) { - if (exchActions.newClusterState() == ClusterState.ACTIVE) - assignPartitionsStates(); - Set<String> caches = exchActions.cachesToResetLostPartitions(); if (!F.isEmpty(caches)) @@ -1783,13 +1893,34 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte nodes = new ArrayList<>(srvNodes); } + IgniteCheckedException err = null; + + if (stateChangeExchange()) { + StateChangeRequest req = exchActions.stateChangeRequest(); + + assert req != null : exchActions; + + boolean stateChangeErr = false; + + if (!F.isEmpty(changeGlobalStateExceptions)) { + stateChangeErr = true; + + err = new IgniteCheckedException("Cluster state change failed."); + + cctx.kernalContext().state().onStateChangeError(changeGlobalStateExceptions, req); + } + + boolean active = !stateChangeErr && req.activate(); + + ChangeGlobalStateFinishMessage msg = new ChangeGlobalStateFinishMessage(req.requestId(), active); + + cctx.discovery().sendCustomEvent(msg); + } + if (!nodes.isEmpty()) sendAllPartitions(nodes); - if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions)) - cctx.kernalContext().state().onFullResponseMessage(changeGlobalStateExceptions); - - onDone(exchangeId().topologyVersion()); + onDone(exchangeId().topologyVersion(), err); } } catch (IgniteCheckedException e) { @@ -1898,7 +2029,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Message. */ private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { - assert msg.exchangeId().equals(exchId) : msg; + assert exchId.equals(msg.exchangeId()) : msg; assert msg.lastVersion() != null : msg; synchronized (this) { @@ -1919,10 +2050,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte updatePartitionFullMap(msg); - if (exchangeOnChangeGlobalState && !F.isEmpty(msg.getExceptionsMap())) - cctx.kernalContext().state().onFullResponseMessage(msg.getExceptionsMap()); + IgniteCheckedException err = null; - onDone(exchId.topologyVersion()); + if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { + err = new IgniteCheckedException("Cluster state change failed"); + + cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); + } + + onDone(exchId.topologyVersion(), err); } /** @@ -2143,7 +2279,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (crd0.isLocal()) { - if (exchangeOnChangeGlobalState && changeGlobalStateE != null) + if (stateChangeExchange() && changeGlobalStateE != null) changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); if (allReceived) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 0beb935..75609b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -90,10 +90,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Exceptions. */ @GridToStringInclude @GridDirectTransient - private Map<UUID, Exception> exs; + private Map<UUID, Exception> errs; /** */ - private byte[] exsBytes; + private byte[] errsBytes; /** */ @GridDirectTransient @@ -224,17 +224,17 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * + * @return Errors map. */ - public Map<UUID, Exception> getExceptionsMap() { - return exs; + @Nullable Map<UUID, Exception> getErrorsMap() { + return errs; } /** - * @param exs Exs. + * @param errs Errors map. */ - public void setExceptionsMap(Map<UUID, Exception> exs) { - this.exs = new HashMap<>(exs); + void setErrorsMap(Map<UUID, Exception> errs) { + this.errs = new HashMap<>(errs); } /** {@inheritDoc} */ @@ -245,14 +245,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa (partCntrs != null && partCntrsBytes == null) || (partHistSuppliers != null && partHistSuppliersBytes == null) || (partsToReload != null && partsToReloadBytes == null) || - (exs != null && exsBytes == null); + (errs != null && errsBytes == null); if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; byte[] partHistSuppliersBytes0 = null; byte[] partsToReloadBytes0 = null; - byte[] exsBytes0 = null; + byte[] errsBytes0 = null; if (parts != null && partsBytes == null) partsBytes0 = U.marshal(ctx, parts); @@ -266,8 +266,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partsToReload != null && partsToReloadBytes == null) partsToReloadBytes0 = U.marshal(ctx, partsToReload); - if (exs != null && exsBytes == null) - exsBytes0 = U.marshal(ctx, exs); + if (errs != null && errsBytes == null) + errsBytes0 = U.marshal(ctx, errs); if (compress) { assert !compressed(); @@ -277,13 +277,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0); byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0); - byte[] exsBytesZip = U.zip(exsBytes0); + byte[] exsBytesZip = U.zip(errsBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; partHistSuppliersBytes0 = partHistSuppliersBytesZip; partsToReloadBytes0 = partsToReloadBytesZip; - exsBytes0 = exsBytesZip; + errsBytes0 = exsBytesZip; compressed(true); } @@ -296,7 +296,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa partCntrsBytes = partCntrsBytes0; partHistSuppliersBytes = partHistSuppliersBytes0; partsToReloadBytes = partsToReloadBytes0; - exsBytes = exsBytes0; + errsBytes = errsBytes0; } } @@ -379,15 +379,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); - if (exsBytes != null && exs == null) { + if (errsBytes != null && errs == null) { if (compressed()) - exs = U.unmarshalZip(ctx.marshaller(), exsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + errs = U.unmarshalZip(ctx.marshaller(), errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - exs = U.unmarshal(ctx, exsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + errs = U.unmarshal(ctx, errsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (exs == null) - exs = new HashMap<>(); + if (errs == null) + errs = new HashMap<>(); } /** {@inheritDoc} */ @@ -412,7 +412,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa writer.incrementState(); case 6: - if (!writer.writeByteArray("exsBytes", exsBytes)) + if (!writer.writeByteArray("errsBytes", errsBytes)) return false; writer.incrementState(); @@ -472,7 +472,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 6: - exsBytes = reader.readByteArray("exsBytes"); + errsBytes = reader.readByteArray("errsBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 1e5ea14..b4d25c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -76,10 +76,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Exception. */ @GridToStringInclude @GridDirectTransient - private Exception ex; + private Exception err; /** */ - private byte[] exBytes; + private byte[] errBytes; /** */ private boolean client; @@ -220,15 +220,15 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @param ex Exception. */ - public void setException(Exception ex) { - this.ex = ex; + public void setError(Exception ex) { + this.err = ex; } /** - * + * @return Not null exception if exchange processing failed. */ - public Exception getException() { - return ex; + @Nullable public Exception getError() { + return err; } /** {@inheritDoc} @@ -239,13 +239,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || (partHistCntrs != null && partHistCntrsBytes == null) || - (ex != null && exBytes == null); + (err != null && errBytes == null); if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; byte[] partHistCntrsBytes0 = null; - byte[] exBytes0 = null; + byte[] errBytes0 = null; if (parts != null && partsBytes == null) partsBytes0 = U.marshal(ctx, parts); @@ -256,8 +256,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partHistCntrs != null && partHistCntrsBytes == null) partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); - if (ex != null && exBytes == null) - exBytes0 = U.marshal(ctx, ex); + if (err != null && errBytes == null) + errBytes0 = U.marshal(ctx, err); if (compress) { assert !compressed(); @@ -266,12 +266,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0); - byte[] exBytesZip = U.zip(exBytes0); + byte[] exBytesZip = U.zip(errBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; partHistCntrsBytes0 = partHistCntrsBytesZip; - exBytes0 = exBytesZip; + errBytes0 = exBytesZip; compressed(true); } @@ -283,7 +283,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; partHistCntrsBytes = partHistCntrsBytes0; - exBytes = exBytes0; + errBytes = errBytes0; } } @@ -312,11 +312,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } - if (exBytes != null && ex == null) { + if (errBytes != null && err == null) { if (compressed()) - ex = U.unmarshalZip(ctx.marshaller(), exBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); else - ex = U.unmarshal(ctx, exBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } if (dupPartsData != null) { @@ -368,7 +368,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 7: - if (!writer.writeByteArray("exBytes", exBytes)) + if (!writer.writeByteArray("errBytes", errBytes)) return false; writer.incrementState(); @@ -424,7 +424,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 7: - exBytes = reader.readByteArray("exBytes"); + errBytes = reader.readByteArray("errBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 66b5987..2b18c24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -183,7 +183,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { // No assignments for disabled preloader. GridDhtPartitionTopology top = grp.topology(); - if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active()) + if (!grp.rebalanceEnabled()) return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); int partCnt = grp.affinity().partitions(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 062ff6c..a49812e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -45,8 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistribu import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 41e6d70..29c7aad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -26,7 +26,6 @@ import java.util.UUID; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index b3ab1cd..c700ef4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -65,7 +65,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.PersistenceMetrics; -import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; @@ -100,11 +99,13 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecor import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; @@ -113,9 +114,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; @@ -352,9 +350,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - snapshotMgr = cctx.snapshot(); + super.start0(); - assert !cctx.kernalContext().state().active() : "Cluster with persistent must starting as inactive."; + snapshotMgr = cctx.snapshot(); if (!cctx.kernalContext().clientNode()) { IgnitePageStoreManager store = cctx.pageStore(); @@ -371,15 +369,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log); persStoreMetrics.wal(cctx.wal()); - - registrateMetricsMBean(); } } /** - * + * @throws IgniteCheckedException If failed. */ - @Override public void initDataBase() throws IgniteCheckedException { + private void initDataBase() throws IgniteCheckedException { Long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize(); if (persistenceCfg.getCheckpointingThreads() > 1) @@ -432,8 +428,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } checkpointPageBufSize = cpBufSize; - - super.start0(); } /** {@inheritDoc} */ @@ -442,58 +436,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - if (reconnect || cctx.kernalContext().clientNode() || !cctx.kernalContext().state().active()) - return; - - initDataBase(); - - initCachesAndRestoreMemory(); - } - - /** - * - */ - private void initCachesAndRestoreMemory() throws IgniteCheckedException { - Collection<String> cacheNames = new HashSet<>(); - - // TODO IGNITE-5075 group descriptors. - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) { - if (CU.isSystemCache(ccfg.getName())) { - storeMgr.initializeForCache( - cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), - new StoredCacheData(ccfg) - ); - - cacheNames.add(ccfg.getName()); - } - } - - for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) - if (!CU.isSystemCache(ccfg.getName())) { - DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptors().get(ccfg.getName()); - - if (cacheDesc != null) - storeMgr.initializeForCache( - cacheDesc.groupDescriptor(), - new StoredCacheData(ccfg) - ); - - cacheNames.add(ccfg.getName()); - } - - for (StoredCacheData cacheData : cctx.pageStore().readCacheConfigurations().values()) { - if (!cacheNames.contains(cacheData.config().getName())) - storeMgr.initializeForCache( - cctx.cache().cacheDescriptors().get( - cacheData.config().getName()).groupDescriptor(), cacheData); - } - - readCheckpointAndRestoreMemory(); - } - - /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Activate database manager [id=" + cctx.localNodeId() + " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); @@ -504,16 +447,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan initDataBase(); registrateMetricsMBean(); - - initCachesAndRestoreMemory(); } - if (log.isDebugEnabled()) - log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]"); + super.onActivate(ctx); } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { if (log.isDebugEnabled()) log.debug("DeActivate database manager [id=" + cctx.localNodeId() + " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); @@ -530,7 +470,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * + * @throws IgniteCheckedException If failed. */ private void registrateMetricsMBean() throws IgniteCheckedException { try { @@ -564,13 +504,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - /** - * - */ - private void readCheckpointAndRestoreMemory() throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void readCheckpointAndRestoreMemory(List<DynamicCacheDescriptor> cachesToStart) throws IgniteCheckedException { checkpointReadLock(); try { + if (!F.isEmpty(cachesToStart)) { + for (DynamicCacheDescriptor desc : cachesToStart) { + if (CU.affinityNode(cctx.localNode(), desc.cacheConfiguration().getNodeFilter())) + storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration())); + } + } + CheckpointStatus status = readCheckpointStatus(); // First, bring memory to the last consistent checkpoint state if needed. @@ -774,13 +719,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean isSrvNode = !cctx.kernalContext().clientNode(); - boolean clusterStatusActive = cctx.kernalContext().state().active(); - - boolean clusterInTransitionStateToActive = fut.newClusterState() == ClusterState.ACTIVE; + boolean clusterInTransitionStateToActive = fut.activateCluster(); // Before local node join event. - if (clusterInTransitionStateToActive || - (joinEvt && locNode && isSrvNode && clusterStatusActive)) + if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) restoreState(); if (cctx.kernalContext().query().moduleEnabled()) { @@ -1579,9 +1521,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Map<T2<Integer, Integer>, T2<Integer, Long>> partStates ) throws IgniteCheckedException { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) + if (grp.isLocal() || !grp.affinityNode()) { // Local cache has no partitions and its states. continue; + } int grpId = grp.groupId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 4e322b9..7a38b61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -135,10 +135,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override protected CacheDataStore createCacheDataStore0(final int p) throws IgniteCheckedException { - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database(); - - boolean exists = ctx.pageStore() != null - && ctx.pageStore().exists(grp.groupId(), p); + boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p); return new GridCacheDataStore(p, exists); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index f04c278..c5f174c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import javax.management.JMException; @@ -41,8 +42,10 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker; import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker; import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker; @@ -51,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.persistence.evict.RandomLruPa import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -100,13 +102,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getMemoryConfiguration() == null) return; - init(); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void init() throws IgniteCheckedException { MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration(); assert memCfg != null; @@ -114,14 +109,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap validateConfiguration(memCfg); pageSize = memCfg.getPageSize(); - - initPageMemoryPolicies(memCfg); - - registerMetricsMBeans(); - - startMemoryPolicies(); - - initPageMemoryDataStructures(memCfg); } /** @@ -149,12 +136,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap ) { try { U.registerMBean( - cfg.getMBeanServer(), - cfg.getIgniteInstanceName(), - "MemoryMetrics", - memPlcCfg.getName(), - new MemoryMetricsMXBeanImpl(memMetrics, memPlcCfg), - MemoryMetricsMXBean.class); + cfg.getMBeanServer(), + cfg.getIgniteInstanceName(), + "MemoryMetrics", + memPlcCfg.getName(), + new MemoryMetricsMXBeanImpl(memMetrics, memPlcCfg), + MemoryMetricsMXBean.class); } catch (JMException e) { U.error(log, "Failed to register MBean for MemoryMetrics with name: '" + memMetrics.getName() + "'", e); @@ -163,6 +150,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * @param dbCfg Database config. + * @throws IgniteCheckedException If failed. */ protected void initPageMemoryDataStructures(MemoryConfiguration dbCfg) throws IgniteCheckedException { freeListMap = U.newHashMap(memPlcMap.size()); @@ -554,13 +542,6 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @throws IgniteCheckedException If failed. - */ - public void initDataBase() throws IgniteCheckedException { - // No-op. - } - - /** * @return collection of all configured {@link MemoryPolicy policies}. */ public Collection<MemoryPolicy> memoryPolicies() { @@ -592,6 +573,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * @param cachesToStart Started caches. + * @throws IgniteCheckedException If failed. + */ + public void readCheckpointAndRestoreMemory(List<DynamicCacheDescriptor> cachesToStart) throws IgniteCheckedException { + // No-op. + } + + /** * @param memPlcName Name of {@link MemoryPolicy} to obtain {@link MemoryMetrics} for. * @return {@link MemoryMetrics} snapshot for specified {@link MemoryPolicy} or {@code null} if * no {@link MemoryPolicy} is configured for specified name. @@ -947,11 +936,24 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** {@inheritDoc} */ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - start0(); + if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getMemoryConfiguration() == null) + return; + + MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration(); + + assert memCfg != null; + + initPageMemoryPolicies(memCfg); + + registerMetricsMBeans(); + + startMemoryPolicies(); + + initPageMemoryDataStructures(memCfg); } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { stop0(false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java index ad804cb..cce6f55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheSnapshotManager.java @@ -124,7 +124,7 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter im FullPageId fullId, PageMemory pageMem ) throws IgniteCheckedException { - + // No-op. } /** @@ -135,14 +135,16 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter im ByteBuffer pageBuf, Integer tag ) throws IgniteCheckedException { - + // No-op. } + /** {@inheritDoc} */ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - + // No-op. } - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { - + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) { + // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index f908512..28bf6e4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -162,7 +162,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { if (log.isDebugEnabled()) log.debug("DeActivate page store manager [id=" + cctx.localNodeId() + " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); @@ -208,18 +208,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** {@inheritDoc} */ @Override public void storeCacheData( - CacheGroupDescriptor grpDesc, StoredCacheData cacheData ) throws IgniteCheckedException { - File cacheWorkDir = cacheWorkDirectory(grpDesc, cacheData.config()); + File cacheWorkDir = cacheWorkDirectory(cacheData.config()); File file; checkAndInitCacheWorkDir(cacheWorkDir); assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir; - if (grpDesc.sharedGroup()) + if (cacheData.config().getGroupName() != null) file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME); else file = new File(cacheWorkDir, CACHE_DATA_FILENAME); @@ -333,14 +332,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param grpDesc Cache group descriptor. * @param ccfg Cache configuration. * @return Cache work directory. */ - private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) { + private File cacheWorkDirectory(CacheConfiguration ccfg) { String dirName; - if (grpDesc.sharedGroup()) + if (ccfg.getGroupName() != null) dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); else dirName = CACHE_DIR_PREFIX + ccfg.getName(); @@ -357,7 +355,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException { assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName(); - File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); + File cacheWorkDir = cacheWorkDirectory(ccfg); boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir); http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f877a14..8993112 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -288,14 +288,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - super.onKernalStart0(reconnect); - - if (!cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) - archiver.start(); - } - /** * @return Consistent ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 63228a0..7f859a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -560,7 +560,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedCacheHandler(topic, resHnd); + cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { @@ -744,7 +744,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedCacheHandler(topic, resHnd); + cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 62ead23..8ff2f5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -34,7 +34,6 @@ import javax.cache.integration.CacheWriterException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index bc2e49a..269925d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -88,12 +88,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { } /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { - for (ClusterNode n : cctx.discovery().remoteNodes()) - onReceived(n.id(), n.metrics().getLastDataVersion()); - } - - /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { cctx.gridEvents().removeLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index 1dd47ed..dad6728 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -44,11 +44,6 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException; /** - * @throws IgniteCheckedException If failed. - */ - public void onUtilityCacheStarted() throws IgniteCheckedException; - - /** * @param typeName Type name. * @return Type ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 67e14dc..70711e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -307,11 +307,6 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override public void onUtilityCacheStarted() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ @Override public int typeId(String typeName) { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java new file mode 100644 index 0000000..0771198 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Custom message ID. */ + private IgniteUuid id = IgniteUuid.randomUuid(); + + /** State change request ID. */ + private UUID reqId; + + /** New cluster state. */ + private boolean clusterActive; + + /** + * @param reqId State change request ID. + * @param clusterActive New cluster state. + */ + public ChangeGlobalStateFinishMessage(UUID reqId, boolean clusterActive) { + assert reqId != null; + + this.reqId = reqId; + this.clusterActive = clusterActive; + } + + /** + * @return State change request ID. + */ + public UUID requestId() { + return reqId; + } + + /** + * @return New cluster state. + */ + public boolean clusterActive() { + return clusterActive; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ChangeGlobalStateFinishMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java new file mode 100644 index 0000000..6579399 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.StoredCacheData; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Message represent request for change cluster global state. + */ +public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Custom message ID. */ + private IgniteUuid id = IgniteUuid.randomUuid(); + + /** Request ID */ + private UUID reqId; + + /** Initiator node ID. */ + private UUID initiatingNodeId; + + /** If true activate else deactivate. */ + private boolean activate; + + /** Configurations read from persistent store. */ + private List<StoredCacheData> storedCfgs; + + /** */ + @GridToStringExclude + private transient ExchangeActions exchangeActions; + + /** + * @param reqId State change request ID. + * @param initiatingNodeId Node initiated state change. + * @param storedCfgs Configurations read from persistent store. + * @param activate New cluster state. + */ + public ChangeGlobalStateMessage( + UUID reqId, + UUID initiatingNodeId, + @Nullable List<StoredCacheData> storedCfgs, + boolean activate + ) { + assert reqId != null; + assert initiatingNodeId != null; + + this.reqId = reqId; + this.initiatingNodeId = initiatingNodeId; + this.storedCfgs = storedCfgs; + this.activate = activate; + } + + /** + * @return Configurations read from persistent store.. + */ + @Nullable public List<StoredCacheData> storedCacheConfigurations() { + return storedCfgs; + } + + /** + * @return Cache updates to be executed on exchange. If {@code null} exchange is not needed. + */ + @Nullable public ExchangeActions exchangeActions() { + return exchangeActions; + } + + /** + * @param exchangeActions Cache updates to be executed on exchange. + */ + void exchangeActions(ExchangeActions exchangeActions) { + assert exchangeActions != null && !exchangeActions.empty() : exchangeActions; + + this.exchangeActions = exchangeActions; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** + * @return Node initiated state change. + */ + public UUID initiatorNodeId() { + return initiatingNodeId; + } + + /** + * @return New cluster state. + */ + public boolean activate() { + return activate; + } + + /** + * @return State change request ID. + */ + public UUID requestId() { + return reqId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ChangeGlobalStateMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index ad95a78..dc503fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -270,7 +270,6 @@ public class ClusterProcessor extends GridProcessorAdapter { dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData()); } - /** * @return Discovery data. */ @@ -314,7 +313,7 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { if (notifyEnabled.get()) { try { verChecker = new GridUpdateNotifier(ctx.igniteInstanceName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java new file mode 100644 index 0000000..71bf90b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.io.Serializable; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Discovery data related to cluster state. + */ +public class DiscoveryDataClusterState implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final boolean active; + + /** */ + private final UUID transitionReqId; + + /** Topology version for state change exchange. */ + @GridToStringInclude + private final AffinityTopologyVersion transitionTopVer; + + /** Nodes participating in state change exchange. */ + @GridToStringExclude + private final Set<UUID> transitionNodes; + + /** Local flag for state transition result (global state is updated asynchronously by custom message). */ + private transient volatile Boolean transitionRes; + + /** + * @param active Current status. + * @return State instance. + */ + static DiscoveryDataClusterState createState(boolean active) { + return new DiscoveryDataClusterState(active, null, null, null); + } + + /** + * @param active New status. + * @param transitionReqId State change request ID. + * @param transitionTopVer State change topology version. + * @param transitionNodes Nodes participating in state change exchange. + * @return State instance. + */ + static DiscoveryDataClusterState createTransitionState(boolean active, + UUID transitionReqId, + AffinityTopologyVersion transitionTopVer, + Set<UUID> transitionNodes) { + assert transitionReqId != null; + assert transitionTopVer != null; + assert !F.isEmpty(transitionNodes) : transitionNodes; + + return new DiscoveryDataClusterState(active, transitionReqId, transitionTopVer, transitionNodes); + } + + /** + * @param active New state. + * @param transitionReqId State change request ID. + * @param transitionTopVer State change topology version. + * @param transitionNodes Nodes participating in state change exchange. + */ + private DiscoveryDataClusterState(boolean active, + @Nullable UUID transitionReqId, + @Nullable AffinityTopologyVersion transitionTopVer, + @Nullable Set<UUID> transitionNodes) { + this.active = active; + this.transitionReqId = transitionReqId; + this.transitionTopVer = transitionTopVer; + this.transitionNodes = transitionNodes; + } + + /** + * @return Local flag for state transition result (global state is updated asynchronously by custom message). + */ + @Nullable public Boolean transitionResult() { + return transitionRes; + } + + /** + * Discovery cluster state is changed asynchronously by discovery message, this methods changes local status + * for public API calls. + * + * @param reqId Request ID. + * @param active New cluster state. + */ + public void setTransitionResult(UUID reqId, boolean active) { + if (reqId.equals(transitionReqId)) { + assert transitionRes == null : this; + + transitionRes = active; + } + } + + /** + * @return State change request ID. + */ + public UUID transitionRequestId() { + return transitionReqId; + } + + /** + * @return {@code True} if state change is in progress. + */ + public boolean transition() { + return transitionReqId != null; + } + + /** + * @return State change exchange version. + */ + public AffinityTopologyVersion transitionTopologyVersion() { + return transitionTopVer; + } + + /** + * @return Current cluster state (or new state in case when transition is in progress). + */ + public boolean active() { + return active; + } + + /** + * @return Nodes participating in state change exchange. + */ + public Set<UUID> transitionNodes() { + return transitionNodes; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryDataClusterState.class, this); + } +}
