GG-12140 We will lose data if we cancel snapshot restore (cherry picked from commit 8e3ad6d)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b545fa9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b545fa9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b545fa9 Branch: refs/heads/ignite-5398 Commit: 7b545fa9029ba9f3d90828cd38611f6a2988cb25 Parents: d4c9997 Author: EdShangGG <[email protected]> Authored: Tue May 16 19:07:03 2017 +0300 Committer: EdShangGG <[email protected]> Committed: Thu May 18 16:15:35 2017 +0300 ---------------------------------------------------------------------- .../pagemem/snapshot/SnapshotOperation.java | 4 +- .../pagemem/snapshot/SnapshotOperationType.java | 2 + ...artSnapshotOperationAckDiscoveryMessage.java | 8 ++ .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 32 +++++- .../IgniteCacheDatabaseSharedManager.java | 7 ++ .../GridDhtPartitionsExchangeFuture.java | 115 ++++--------------- .../query/h2/database/H2TreeIndex.java | 3 +- 8 files changed, 75 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java index 93054ec..98f295c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java @@ -105,7 +105,9 @@ public class SnapshotOperation implements Serializable { * @param op Op. */ public static Collection<File> getOptionalPathsParameter(SnapshotOperation op) { - assert (op.type() == SnapshotOperationType.CHECK || op.type() == SnapshotOperationType.RESTORE) + assert (op.type() == SnapshotOperationType.CHECK || + op.type() == SnapshotOperationType.RESTORE || + op.type() == SnapshotOperationType.RESTORE_2_PHASE) && (op.extraParameter() == null || op.extraParameter() instanceof Collection); return (Collection<File>)op.extraParameter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java index 3fa6d2a..c3b3a2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java @@ -23,6 +23,8 @@ public enum SnapshotOperationType { CREATE, /** Restore. */ RESTORE, + /** Restore 2. */ + RESTORE_2_PHASE, /** Move. */ MOVE, /** Delete. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java index 72defd4..af7648d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java @@ -81,6 +81,14 @@ public class StartSnapshotOperationAckDiscoveryMessage implements DiscoveryCusto /** * */ + public boolean needExchange() { + /* exchange for trigger saving cluster state*/ + return err == null && snapshotOperation.type() == SnapshotOperationType.CREATE; + } + + /** + * + */ public IgniteUuid operationId() { return opId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 79166f2..0f6a656 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -282,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(evt.eventNode(), msg); } else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage - && !((StartSnapshotOperationAckDiscoveryMessage)customMsg).hasError()) { + && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange()) { exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); exchFut = exchangeFuture(exchId, evt, null, null, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bf7a4fd..b339bd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1963,6 +1963,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!F.isEmpty(reqs) && err == null) { Collection<IgniteBiTuple<GridCacheContext, Boolean>> stopped = null; + boolean prepared = false; + for (DynamicCacheChangeRequest req : reqs) { String masked = maskNull(req.cacheName()); @@ -1970,6 +1972,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean destroy = false; if (req.stop()) { + if (!prepared) { + sharedCtx.database().prepareCachesStop(); + + prepared = true; + } + stopGateway(req); sharedCtx.database().checkpointReadLock(); @@ -2610,6 +2618,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx, boolean restart) { + return dynamicDestroyCaches(cacheNames, checkThreadTx, restart, true); + } + + /** + * @param cacheNames Collection of cache names to destroy. + * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. + * @return Future that will be completed when cache is destroyed. + */ + public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx, + boolean restart, boolean destroy) { if (checkThreadTx) checkEmptyTransactions(); @@ -2619,7 +2637,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); t.stop(true); - t.destroy(true); + t.destroy(destroy); t.restart(restart); @@ -2915,7 +2933,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg)); if (msg instanceof StartSnapshotOperationAckDiscoveryMessage && - ((StartSnapshotOperationAckDiscoveryMessage)msg).error() == null) + ((StartSnapshotOperationAckDiscoveryMessage)msg).needExchange()) return true; if (msg instanceof DynamicCacheChangeBatch) @@ -3398,6 +3416,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param name Name. + */ + public void restart(@Nullable String name) { + IgniteCacheProxy jcache = (IgniteCacheProxy) jCacheProxies.get(maskNull(name)); + + if (jcache != null) + jcache.restart(); + } + + /** * @param name Cache name. * @return Cache instance for given name. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 6220c43..11d924e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -196,6 +196,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * Needed action before any cache will stop + */ + public void prepareCachesStop() { + // No-op. + } + + /** * @param stoppedCtxs A collection of tuples (cache context, destroy flag). */ public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index fff1702..7a95193 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -51,9 +51,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot; import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation; -import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; -import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; @@ -64,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -571,31 +568,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) : onServerNodeEvent(crdNode); - - StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg; - - if (!cctx.localNode().isDaemon()) { - SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); - - if (op.type() == SnapshotOperationType.RESTORE) { - if (reqs != null) - reqs = new ArrayList<>(reqs); - else - reqs = new ArrayList<>(); - - List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests( - cctx.cache(), op.cacheNames(), cctx.localNodeId()); - - reqs.addAll(destroyRequests); - - if (!reqs.isEmpty()) { //Emulate destroy cache request - if (op.type() == SnapshotOperationType.RESTORE) - cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(reqs), topVer); - - onCacheChangeRequest(crdNode); - } - } - } } else { assert affChangeMsg != null : this; @@ -645,6 +617,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert false; } + if (cctx.localNode().isClient()) + startLocalSnasphotOperation(); + exchLog.info("Finish exchange init [topVer=" + topVer + ", crd=" + crdNode + ']'); } catch (IgniteInterruptedCheckedException e) { @@ -663,36 +638,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @param cache Cache. - * @param cacheNames Cache names. - * @param locNodeId Local node id. - */ - @NotNull public static List<DynamicCacheChangeRequest> getStopCacheRequests(GridCacheProcessor cache, - Set<String> cacheNames, UUID locNodeId) { - List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>(); - - for (String cacheName : cacheNames) { - DynamicCacheDescriptor desc = cache.cacheDescriptor(CU.cacheId(cacheName)); - - if (desc == null) - continue; - - DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, locNodeId); - - t.stop(true); - t.destroy(true); - - t.deploymentId(desc.deploymentId()); - - t.restart(true); - - destroyRequests.add(t); - } - - return destroyRequests; - } - - /** * @throws IgniteCheckedException If failed. */ private void initTopologies() throws IgniteCheckedException { @@ -930,18 +875,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.database().beforeExchange(this); - StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage(); - - // If it's a snapshot operation request, synchronously wait for backup start. - if (snapshotOperationMsg != null) { - if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) { - SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); - - if (op.type() != SnapshotOperationType.RESTORE) - startLocalSnasphotOperation(snapshotOperationMsg); - } - } - if (crd.isLocal()) { if (remaining.isEmpty()) onAllReceived(); @@ -952,16 +885,24 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initDone(); } - /** - * @param snapOpMsg Snapshot operation message. - */ - private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapOpMsg - ) throws IgniteCheckedException { - IgniteInternalFuture fut = cctx.database() - .startLocalSnapshotOperation(snapOpMsg.initiatorNodeId(), snapOpMsg.snapshotOperation()); + /** */ + private void startLocalSnasphotOperation() { + StartSnapshotOperationAckDiscoveryMessage snapOpMsg = getSnapshotOperationMessage(); + + if (snapOpMsg != null) { + SnapshotOperation op = snapOpMsg.snapshotOperation(); + + try { + IgniteInternalFuture fut = cctx.database() + .startLocalSnapshotOperation(snapOpMsg.initiatorNodeId(), snapOpMsg.snapshotOperation()); - if (fut != null) - fut.get(); + if (fut != null) + fut.get(); + } + catch (IgniteCheckedException e) { + log.error("Error while starting snapshot operation", e); + } + } } /** @@ -1306,6 +1247,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cacheValidRes = m; } + startLocalSnasphotOperation(); + cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); cctx.exchange().onExchangeDone(this, err); @@ -1315,20 +1258,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.cache().completeStartFuture(req); } - StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage(); - - if (snapshotOperationMsg != null && !cctx.localNode().isClient() && !cctx.localNode().isDaemon()) { - SnapshotOperation op = snapshotOperationMsg.snapshotOperation(); - - if (op.type() == SnapshotOperationType.RESTORE) - try { - startLocalSnasphotOperation(snapshotOperationMsg); - } - catch (IgniteCheckedException e) { - log.error("Error while starting snapshot operation", e); - } - } - if (exchangeOnChangeGlobalState && err == null) cctx.kernalContext().state().onExchangeDone(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index be5be0a..dcfdec9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -371,7 +371,8 @@ public class H2TreeIndex extends GridH2IndexBase { @Override public void destroy() { try { if (cctx.affinityNode()) { - tree.destroy(); + if (!cctx.kernalContext().cache().context().database().persistenceEnabled()) + tree.destroy(); cctx.offheap().dropRootPageForIndex(tree.getName()); }
