GG-11860 Implement snapshot status on platform level -refactoring RESTORE
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62cc454 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62cc454 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62cc454 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: a62cc454466f921e2edd26bb0a6e0646bdb3a7f1 Parents: ef35f4f Author: EdShangGG <[email protected]> Authored: Thu Mar 2 16:33:58 2017 +0300 Committer: EdShangGG <[email protected]> Committed: Thu Mar 2 16:34:27 2017 +0300 ---------------------------------------------------------------------- .../pagemem/snapshot/SnapshotOperation.java | 3 +- .../GridDhtPartitionsExchangeFuture.java | 119 +++++++++++++++++-- 2 files changed, 109 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java index 3f84b97..f3b5eee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java @@ -105,7 +105,8 @@ public class SnapshotOperation implements Serializable { * @param op Op. */ public static Collection<File> getOptionalPathsParameter(SnapshotOperation op) { - assert op.type() == SnapshotOperationType.CHECK || op.extraParameter() instanceof Collection; + assert (op.type() == SnapshotOperationType.CHECK || op.type() == SnapshotOperationType.RESTORE) + && (op.extraParameter() == null || op.extraParameter() instanceof Collection); return (Collection<File>)op.extraParameter(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 987ba54..4c179e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -48,6 +48,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; +import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation; +import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; @@ -59,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -81,6 +84,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -510,10 +514,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchange = onCacheChangeRequest(crdNode); } - else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage) + else if (msg instanceof StartSnapshotOperationAckDiscoveryMessage) { exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) : onServerNodeEvent(crdNode); + + StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg; + + if (!cctx.localNode().isDaemon()) { + SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); + + if (op.type() == SnapshotOperationType.RESTORE) { + if (reqs != null) + reqs = new ArrayList<>(reqs); + else + reqs = new ArrayList<>(); + + List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests( + cctx.cache(), op.cacheNames(), cctx.localNodeId()); + + reqs.addAll(destroyRequests); + + if (!reqs.isEmpty()) { //Emulate destroy cache request + if (op.type() == SnapshotOperationType.RESTORE) + cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(reqs), topVer); + + onCacheChangeRequest(crdNode); + } + } + } + } else { assert affChangeMsg != null : this; @@ -578,6 +608,36 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param cache Cache. + * @param cacheNames Cache names. + * @param locNodeId Local node id. + */ + @NotNull public static List<DynamicCacheChangeRequest> getStopCacheRequests(GridCacheProcessor cache, + Set<String> cacheNames, UUID locNodeId) { + List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>(); + + for (String cacheName : cacheNames) { + DynamicCacheDescriptor desc = cache.cacheDescriptor(CU.cacheId(cacheName)); + + if (desc == null) + continue; + + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, locNodeId); + + t.stop(true); + t.destroy(true); + + t.deploymentId(desc.deploymentId()); + + t.restart(true); + + destroyRequests.add(t); + } + + return destroyRequests; + } + + /** * @throws IgniteCheckedException If failed. */ private void initTopologies() throws IgniteCheckedException { @@ -806,19 +866,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.database().beforeExchange(this); - // If a backup request, synchronously wait for backup start. - if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { - DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage(); - - if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage) { - StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)customMsg; + StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage(); - if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) { - IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg); + // If it's a snapshot operation request, synchronously wait for backup start. + if (snapshotOperationMsg != null) { + if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) { + SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); - if (fut != null) - fut.get(); - } + if (op.type() != SnapshotOperationType.RESTORE) + startLocalSnasphotOperation(snapshotOperationMsg); } } @@ -833,6 +889,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param snapshotOperationMsg Snapshot operation message. + */ + private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg + ) throws IgniteCheckedException { + IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg); + + if (fut != null) + fut.get(); + } + + /** * @throws IgniteCheckedException If failed. */ private void waitPartitionRelease() throws IgniteCheckedException { @@ -1168,6 +1235,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.cache().completeStartFuture(req); } + StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage(); + + if (snapshotOperationMsg != null && !cctx.localNode().isClient() && !cctx.localNode().isDaemon()) { + SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); + + if (op.type() == SnapshotOperationType.RESTORE) + try { + startLocalSnasphotOperation(snapshotOperationMsg); + } + catch (IgniteCheckedException e) { + log.error("Error while starting snapshot operation", e); + } + } + if (exchangeOnChangeGlobalState && err == null) cctx.kernalContext().state().onExchangeDone(); @@ -1196,6 +1277,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return dummy; } + /** + * + */ + private StartSnapshotOperationAckDiscoveryMessage getSnapshotOperationMessage() { + // If it's a snapshot operation request, synchronously wait for backup start. + if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)discoEvt).customMessage(); + + if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage) + return (StartSnapshotOperationAckDiscoveryMessage)customMsg; + } + return null; + } + /** {@inheritDoc} */ @Nullable @Override public Throwable validateCache( GridCacheContext cctx,
