IGNITE-5935: MVCC TX: Tx recovery protocol This closes #4920
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5939a947 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5939a947 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5939a947 Branch: refs/heads/ignite-9720 Commit: 5939a94763c8a3e92b66b3f591a816dd6c49f35a Parents: 82d2efe Author: ipavlukhin <[email protected]> Authored: Fri Oct 19 17:40:12 2018 +0300 Committer: Igor Sapego <[email protected]> Committed: Fri Oct 19 17:40:12 2018 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 6 +- .../communication/GridIoMessageFactory.java | 18 + .../GridCachePartitionExchangeManager.java | 4 +- .../cache/IgniteCacheOffheapManager.java | 6 +- .../cache/IgniteCacheOffheapManagerImpl.java | 5 + .../cache/PartitionUpdateCounter.java | 30 +- .../distributed/GridCacheTxRecoveryFuture.java | 11 - .../GridDistributedTxRemoteAdapter.java | 52 +- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 38 -- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../topology/GridClientPartitionTopology.java | 3 +- .../dht/topology/GridDhtLocalPartition.java | 7 + .../dht/topology/GridDhtPartitionTopology.java | 5 +- .../topology/GridDhtPartitionTopologyImpl.java | 6 +- .../processors/cache/mvcc/MvccProcessor.java | 7 - .../cache/mvcc/MvccProcessorImpl.java | 217 +++++- .../processors/cache/mvcc/MvccUtils.java | 6 +- .../mvcc/msg/MvccRecoveryFinishedMessage.java | 116 ++++ .../PartitionCountersNeighborcastRequest.java | 145 ++++ .../PartitionCountersNeighborcastResponse.java | 114 ++++ .../persistence/GridCacheOffheapManager.java | 13 + .../cache/transactions/IgniteTxAdapter.java | 3 +- .../cache/transactions/IgniteTxHandler.java | 125 ++++ .../cache/transactions/IgniteTxManager.java | 123 +++- .../PartitionCountersNeighborcastFuture.java | 211 ++++++ .../cache/transactions/TxCounters.java | 3 +- .../continuous/GridContinuousProcessor.java | 6 +- ...xOriginatingNodeFailureAbstractSelfTest.java | 2 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 69 +- ...itionedTxOriginatingNodeFailureSelfTest.java | 2 +- ...woBackupsPrimaryNodeFailureRecoveryTest.java | 2 +- ...ePrimaryNodeFailureRecoveryAbstractTest.java | 133 +++- ...licatedTxOriginatingNodeFailureSelfTest.java | 2 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 4 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 2 +- .../IgniteCacheTxRecoverySelfTestSuite.java | 2 +- .../cache/mvcc/CacheMvccTxRecoveryTest.java | 654 +++++++++++++++++++ ...GridIndexRebuildWithMvccEnabledSelfTest.java | 3 +- .../testsuites/IgniteCacheMvccSqlTestSuite.java | 58 ++ 40 files changed, 1980 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 2599d7a..7492e51 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -43,9 +43,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse; +import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; @@ -171,7 +169,7 @@ public class MessageCodeGenerator { // gen.generateAll(true); - gen.generateAndWrite(GridNearTxEnlistResponse.class); + gen.generateAndWrite(MvccRecoveryFinishedMessage.class); // gen.generateAndWrite(GridNearAtomicUpdateRequest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index e405d7d..3f4eb18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -134,9 +134,12 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest; +import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest; +import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest; +import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -1096,6 +1099,21 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 164: + msg = new MvccRecoveryFinishedMessage(); + + break; + + case 165: + msg = new PartitionCountersNeighborcastRequest(); + + break; + + case 166: + msg = new PartitionCountersNeighborcastResponse(); + + break; + // [-3..119] [124..129] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 6af9678..0b8dd75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1405,7 +1405,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana grp.affinity().similarAffinityKey()); if (sndCounters) { - CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true); + CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true, true); m.addPartitionUpdateCounters(grp.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); @@ -1429,7 +1429,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top.similarAffinityKey()); if (sndCounters) { - CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true); + CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true, true); m.addPartitionUpdateCounters(top.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 044830c..e9ec025 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -1067,8 +1067,12 @@ public interface IgniteCacheOffheapManager { * Return PendingTree for data store. * * @return PendingTree instance. - * @throws IgniteCheckedException */ PendingEntriesTree pendingTree(); + + /** + * Flushes pending update counters closing all possible gaps. + */ + void finalizeUpdateCountres(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index e40cc53..e547784 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1556,6 +1556,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public void finalizeUpdateCountres() { + pCntr.finalizeUpdateCountres(); + } + + /** {@inheritDoc} */ @Override public String name() { return name; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java index b5960ab..fe44708 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java @@ -17,8 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import java.util.PriorityQueue; -import java.util.Queue; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; import org.jetbrains.annotations.NotNull; @@ -31,7 +30,7 @@ public class PartitionUpdateCounter { private IgniteLogger log; /** Queue of counter update tasks*/ - private final Queue<Item> queue = new PriorityQueue<>(); + private final TreeSet<Item> queue = new TreeSet<>(); /** Counter. */ private final AtomicLong cntr = new AtomicLong(); @@ -161,21 +160,34 @@ public class PartitionUpdateCounter { * @return Retrieves the minimum update counter task from queue. */ private Item poll() { - return queue.poll(); + return queue.pollFirst(); } /** * @return Checks the minimum update counter task from queue. */ private Item peek() { - return queue.peek(); + return queue.isEmpty() ? null : queue.first(); + } /** * @param item Adds update task to priority queue. */ private void offer(Item item) { - queue.offer(item); + queue.add(item); + } + + /** + * Flushes pending update counters closing all possible gaps. + */ + public synchronized void finalizeUpdateCountres() { + Item last = queue.pollLast(); + + if (last != null) + update(last.start + last.delta); + + queue.clear(); } /** @@ -199,11 +211,7 @@ public class PartitionUpdateCounter { /** {@inheritDoc} */ @Override public int compareTo(@NotNull Item o) { - int cmp = Long.compare(this.start, o.start); - - assert cmp != 0; - - return cmp; + return Long.compare(this.start, o.start); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 3fb1e4f..5e0deb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -146,17 +146,6 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B */ @SuppressWarnings("ConstantConditions") public void prepare() { - if (tx.txState().mvccEnabled()) { // TODO IGNITE-5935 - U.error(log, "Cannot commit MVCC enabled transaction by recovery procedure. " + - "Operation is usupported at the moment [tx=" + CU.txString(tx) + ']'); - - onDone(false); - - markInitialized(); - - return; - } - if (nearTxCheck) { UUID nearNodeId = tx.eventNodeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 4db4685..3cabaec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -50,9 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; @@ -770,15 +767,15 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Apply update counters. if (txCntrs != null) - applyPartitionsUpdatesCounters(txCntrs.updateCounters()); + cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCntrs.updateCounters()); - cctx.mvccCaching().onTxFinished(this, true); + cctx.mvccCaching().onTxFinished(this, true); - if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { - // Set new update counters for data entries received from persisted tx entries. - List<DataEntry> entriesWithCounters = dataEntries.stream() - .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) - .collect(Collectors.toList()); + if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) { + // Set new update counters for data entries received from persisted tx entries. + List<DataEntry> entriesWithCounters = dataEntries.stream() + .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) + .collect(Collectors.toList()); cctx.wal().log(new DataRecord(entriesWithCounters)); } @@ -921,7 +918,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter TxCounters counters = txCounters(false); if (counters != null) - applyPartitionsUpdatesCounters(counters.updateCounters()); + cctx.tm().txHandler().applyPartitionsUpdatesCounters(counters.updateCounters()); state(ROLLED_BACK); @@ -996,39 +993,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } - /** - * Applies partition counters updates for mvcc transactions. - * - * @param counters Counters values to be updated. - */ - private void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) { - if (counters == null) - return; - - int cacheId = CU.UNDEFINED_CACHE_ID; - GridDhtPartitionTopology top = null; - - for (PartitionUpdateCountersMessage counter : counters) { - if (counter.cacheId() != cacheId) { - GridCacheContext ctx0 = cctx.cacheContext(cacheId = counter.cacheId()); - - assert ctx0.mvccEnabled(); - - top = ctx0.topology(); - } - - assert top != null; - - for (int i = 0; i < counter.size(); i++) { - GridDhtLocalPartition part = top.localPartition(counter.partition(i)); - - assert part != null; - - part.updateCounter(counter.initialCounter(i), counter.updatesCount(i)); - } - } - } - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 9f96b46..d0fbd90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -375,7 +375,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity false, false, tx.mvccSnapshot(), - tx.filterUpdateCountersForBackupNode(n)); + cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n)); try { cctx.io().send(n, req, tx.ioPolicy()); @@ -488,7 +488,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity false, false, mvccSnapshot, - commit ? null : tx.filterUpdateCountersForBackupNode(n)); + commit ? null : cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n)); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 483990f..86f9c3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -34,7 +33,6 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -48,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; -import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; @@ -944,41 +941,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return prepFut; } - /** - * @param node Backup node. - * @return Partition counters map for the given backup node. - */ - public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(ClusterNode node) { - TxCounters txCntrs = txCounters(false); - - if (txCntrs == null || F.isEmpty(txCntrs.updateCounters())) - return null; - - Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters(); - - List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size()); - - AffinityTopologyVersion top = topologyVersionSnapshot(); - - for (PartitionUpdateCountersMessage partCntrs : updCntrs) { - GridCacheAffinityManager affinity = cctx.cacheContext(partCntrs.cacheId()).affinity(); - - PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size()); - - for (int i = 0; i < partCntrs.size(); i++) { - int part = partCntrs.partition(i); - - if (affinity.backupByPartition(node, part, top)) - resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i)); - } - - if (resCntrs.size() > 0) - res.add(resCntrs); - } - - return res; - } - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index c505677..609bff8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1398,7 +1398,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite tx.storeWriteThrough(), retVal, mvccSnapshot, - tx.filterUpdateCountersForBackupNode(n)); + cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n)); req.queryUpdate(dhtMapping.queryUpdate()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java index 9140322..cd6e254 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java @@ -1234,7 +1234,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) { + @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros, + boolean finalizeCntrsBeforeCollecting) { return CachePartitionPartialCountersMap.EMPTY; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 253a56a..2ddc0d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -1371,6 +1371,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * Flushes pending update counters closing all possible gaps. + */ + public void finalizeUpdateCountres() { + store.finalizeUpdateCountres(); + } + + /** * Removed entry holder. */ private static class RemovedEntryHolder { http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java index b6cb5bb..25b284e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java @@ -350,9 +350,12 @@ public interface GridDhtPartitionTopology { public CachePartitionFullCountersMap fullUpdateCounters(); /** + * @param skipZeros {@code True} for adding zero counter to map. + * @param finalizeCntrsBeforeCollecting {@code True} indicates that partition counters should be finalized. * @return Partition update counters. */ - public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros); + public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros, + boolean finalizeCntrsBeforeCollecting); /** * @return Partition cache sizes. http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index 94bb7f1..1f338d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -2657,7 +2657,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) { + @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros, + boolean finalizeCntrsBeforeCollecting) { lock.readLock().lock(); try { @@ -2678,6 +2679,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; + if (finalizeCntrsBeforeCollecting) + part.finalizeUpdateCountres(); + long updCntr = part.updateCounter(); long initCntr = part.initialUpdateCounter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java index a09468f..a926acf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java @@ -180,13 +180,6 @@ public interface MvccProcessor extends GridProcessor { /** * Requests snapshot on Mvcc coordinator. * - * @return Snapshot future. - */ - IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(); - - /** - * Requests snapshot on Mvcc coordinator. - * * @param tx Transaction. * @return Snapshot future. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index f17c137..9fcafb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -20,14 +20,17 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -68,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMes import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest; +import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest; @@ -189,8 +193,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** */ private final GridAtomicLong committedCntr = new GridAtomicLong(MVCC_INITIAL_CNTR); - /** */ - private final Map<Long, Long> activeTxs = new HashMap<>(); + /** + * Contains active transactions on mvcc coordinator. Key is mvcc counter. + * Access is protected by "this" monitor. + */ + private final Map<Long, ActiveTx> activeTxs = new HashMap<>(); /** Active query trackers. */ private final Map<Long, MvccQueryTracker> activeTrackers = new ConcurrentHashMap<>(); @@ -223,6 +230,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce private volatile boolean mvccSupported = true; /** + * Maps failed node id to votes accumulator for that node. + */ + private final ConcurrentHashMap<UUID, RecoveryBallotBox> recoveryBallotBoxes = new ConcurrentHashMap<>(); + + /** * @param ctx Context. */ public MvccProcessorImpl(GridKernalContext ctx) { @@ -363,8 +375,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce /** {@inheritDoc} */ @Override public void onExchangeDone(boolean newCrd, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) { - if (!newCrd) + if (!newCrd) { + if (curCrd != null && ctx.localNodeId().equals(curCrd.nodeId()) && discoCache != null) + cleanupOrphanedServerTransactions(discoCache.serverNodes()); + return; + } ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange(); @@ -391,6 +407,33 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } } + /** + * Cleans up active transacitons lost near node which is server. Executed on coordinator. + * @param liveSrvs Live server nodes at the moment of cleanup. + */ + private void cleanupOrphanedServerTransactions(Collection<ClusterNode> liveSrvs) { + Set<UUID> ids = liveSrvs.stream() + .map(ClusterNode::id) + .collect(Collectors.toSet()); + + List<Long> forRmv = new ArrayList<>(); + + synchronized (this) { + for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) { + // If node started tx is not known as live then remove such tx from active list + ActiveTx activeTx = entry.getValue(); + + if (activeTx.getClass() == ActiveServerTx.class && !ids.contains(activeTx.nearNodeId)) + forRmv.add(entry.getKey()); + } + } + + for (Long txCntr : forRmv) + // Committed counter is increased because it is not known if transaction was committed or not and we must + // bump committed counter for committed transaction as it is used in (read-only) query snapshot. + onTxDone(txCntr, true); + } + /** {@inheritDoc} */ @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) { prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries); @@ -530,17 +573,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone()) return null; else if (tx != null) - return assignTxSnapshot(0L); + return assignTxSnapshot(0L, ctx.localNodeId(), false); else return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() { - return requestSnapshotAsync((IgniteInternalTx)null); - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) { MvccSnapshotFuture fut = new MvccSnapshotFuture(); @@ -585,7 +623,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce }); } else if (tx != null) - lsnr.onResponse(assignTxSnapshot(0L)); + lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false)); else lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L)); @@ -741,9 +779,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce first = false; } - for (MvccSnapshotResponseListener lsnr : map.values()) { + for (MvccSnapshotResponseListener lsnr : map.values()) U.warn(log, ">>> " + lsnr.toString()); - } } first = true; @@ -909,10 +946,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce return activeQryTrackers; } - /** - * @return Counter. - */ - private MvccSnapshotResponse assignTxSnapshot(long futId) { + /** */ + private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) { assert initFut.isDone(); assert crdVer != 0; assert ctx.localNodeId().equals(currentCoordinatorId()); @@ -926,14 +961,16 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce tracking = ver; cleanup = committedCntr.get() + 1; - for (Map.Entry<Long, Long> txVer : activeTxs.entrySet()) { - cleanup = Math.min(txVer.getValue(), cleanup); - tracking = Math.min(txVer.getKey(), tracking); + for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) { + cleanup = Math.min(entry.getValue().tracking, cleanup); + tracking = Math.min(entry.getKey(), tracking); - res.addTx(txVer.getKey()); + res.addTx(entry.getKey()); } - boolean add = activeTxs.put(ver, tracking) == null; + ActiveTx activeTx = client ? new ActiveTx(tracking, nearId) : new ActiveServerTx(tracking, nearId); + + boolean add = activeTxs.put(ver, activeTx) == null; assert add : ver; } @@ -950,10 +987,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce return res; } - /** - * @param txCntr Counter assigned to transaction. - */ - private void onTxDone(Long txCntr, boolean committed) { + /** */ + private void onTxDone(Long txCntr, boolean increaseCommittedCntr) { assert initFut.isDone(); GridFutureAdapter fut; @@ -961,7 +996,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce synchronized (this) { activeTxs.remove(txCntr); - if (committed) + if (increaseCommittedCntr) committedCntr.setIfGreater(txCntr); } @@ -1352,10 +1387,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce return; } - MvccSnapshotResponse res = assignTxSnapshot(msg.futureId()); + MvccSnapshotResponse res = assignTxSnapshot(msg.futureId(), nodeId, node.isClient()); + + boolean finishFailed = true; try { sendMessage(node.id(), res); + + finishFailed = false; } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) @@ -1364,6 +1403,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce catch (IgniteCheckedException e) { U.error(log, "Failed to send tx snapshot response [msg=" + msg + ", node=" + nodeId + ']', e); } + + if (finishFailed) + onTxDone(res.counter(), false); } /** @@ -1390,9 +1432,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); - onQueryDone(nodeId, res.tracking()); + + U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e); } } @@ -1713,6 +1755,23 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce activeQueries.onNodeFailed(nodeId); prevCrdQueries.onNodeFailed(nodeId); + + recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> { + // Put synthetic vote from another failed node + ballotBox.vote(nodeId); + + tryFinishRecoveryVoting(nearNodeId, ballotBox); + }); + + if (discoEvt.eventNode().isClient()) { + RecoveryBallotBox ballotBox = recoveryBallotBoxes + .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox()); + + ballotBox + .voters(discoEvt.topologyNodes().stream().map(ClusterNode::id).collect(Collectors.toList())); + + tryFinishRecoveryVoting(nodeId, ballotBox); + } } /** {@inheritDoc} */ @@ -1767,6 +1826,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg); else if (msg instanceof MvccActiveQueriesMessage) processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg); + else if (msg instanceof MvccRecoveryFinishedMessage) + processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg)); else U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); } @@ -1777,6 +1838,82 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } } + /** + * Accumulates transaction recovery votes for a node left the cluster. + * Transactions started by the left node are considered not active + * when each cluster server node aknowledges that is has finished transactions for the left node. + */ + private static class RecoveryBallotBox { + /** */ + private List<UUID> voters; + /** */ + private final Set<UUID> ballots = new HashSet<>(); + + /** + * @param voters Nodes which can have transaction started by the left node. + */ + private synchronized void voters(List<UUID> voters) { + this.voters = voters; + } + + /** + * @param nodeId Voting node id. + * + */ + private synchronized void vote(UUID nodeId) { + ballots.add(nodeId); + } + + /** + * @return {@code True} if all nodes expected to vote done it. + */ + private synchronized boolean isVotingDone() { + if (voters == null) + return false; + + return ballots.containsAll(voters); + } + } + + /** + * Process message that one node has finished with transactions for the left node. + * @param nodeId Node sent the message. + * @param msg Message. + */ + private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) { + UUID nearNodeId = msg.nearNodeId(); + + RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox()); + + ballotBox.vote(nodeId); + + tryFinishRecoveryVoting(nearNodeId, ballotBox); + } + + /** + * Finishes recovery on coordinator by removing transactions started by the left node + * @param nearNodeId Left node. + * @param ballotBox Votes accumulator for the left node. + */ + private void tryFinishRecoveryVoting(UUID nearNodeId, RecoveryBallotBox ballotBox) { + if (ballotBox.isVotingDone()) { + List<Long> recoveredTxs; + + synchronized (this) { + recoveredTxs = activeTxs.entrySet().stream() + .filter(e -> e.getValue().nearNodeId.equals(nearNodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + // Committed counter is increased because it is not known if transaction was committed or not and we must + // bump committed counter for committed transaction as it is used in (read-only) query snapshot. + recoveredTxs.forEach(txCntr -> onTxDone(txCntr, true)); + + recoveryBallotBoxes.remove(nearNodeId); + } + } + /** */ private interface Waiter { /** @@ -2324,4 +2461,26 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce } } } + + /** */ + private static class ActiveTx { + /** */ + private final long tracking; + /** */ + private final UUID nearNodeId; + + /** */ + private ActiveTx(long tracking, UUID nearNodeId) { + this.tracking = tracking; + this.nearNodeId = nearNodeId; + } + } + + /** */ + private static class ActiveServerTx extends ActiveTx { + /** */ + private ActiveServerTx(long tracking, UUID nearNodeId) { + super(tracking, nearNodeId); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java index 9441c17..972d4d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java @@ -242,7 +242,11 @@ public class MvccUtils { if (mvccCntr > snapshotCntr) // we don't see future updates return false; - if (mvccCntr == snapshotCntr) { + // Basically we can make fast decision about visibility if found rows from the same transaction. + // But we can't make such decision for read-only queries, + // because read-only queries use last committed version in it's snapshot which could be actually aborted + // (during transaction recovery we do not know whether recovered transaction was committed or aborted). + if (mvccCntr == snapshotCntr && snapshotOpCntr != MVCC_READ_OP_CNTR) { assert opCntr <= snapshotOpCntr : "rowVer=" + mvccVersion(mvccCrd, mvccCntr, opCntr) + ", snapshot=" + snapshot; return opCntr < snapshotOpCntr; // we don't see own pending updates http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java new file mode 100644 index 0000000..a4ea103 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** */ +public class MvccRecoveryFinishedMessage implements MvccMessage { + /** */ + private static final long serialVersionUID = -505062368078979867L; + + /** */ + private UUID nearNodeId; + + /** */ + public MvccRecoveryFinishedMessage() { + } + + /** */ + public MvccRecoveryFinishedMessage(UUID nearNodeId) { + this.nearNodeId = nearNodeId; + } + + /** + * @return Left node id for which transactions were recovered. + */ + public UUID nearNodeId() { + return nearNodeId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("nearNodeId", nearNodeId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + nearNodeId = reader.readUuid("nearNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccRecoveryFinishedMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 164; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java new file mode 100644 index 0000000..ffd9a67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import java.util.Collection; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** */ +public class PartitionCountersNeighborcastRequest extends GridCacheIdMessage { + /** */ + private static final long serialVersionUID = -1893577108462486998L; + + /** */ + @GridDirectCollection(PartitionUpdateCountersMessage.class) + private Collection<PartitionUpdateCountersMessage> updCntrs; + + /** */ + private IgniteUuid futId; + + /** */ + public PartitionCountersNeighborcastRequest() { + } + + /** */ + public PartitionCountersNeighborcastRequest( + Collection<PartitionUpdateCountersMessage> updCntrs, IgniteUuid futId) { + this.updCntrs = updCntrs; + this.futId = futId; + } + + /** + * @return Partition update counters for remote node. + */ + public Collection<PartitionUpdateCountersMessage> updateCounters() { + return updCntrs; + } + + /** + * @return Sending future id. + */ + public IgniteUuid futId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("updCntrs", updCntrs, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + updCntrs = reader.readCollection("updCntrs", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(PartitionCountersNeighborcastRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 165; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java new file mode 100644 index 0000000..547539d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** */ +public class PartitionCountersNeighborcastResponse extends GridCacheIdMessage { + /** */ + private static final long serialVersionUID = -8731050539139260521L; + + /** */ + private IgniteUuid futId; + + /** */ + public PartitionCountersNeighborcastResponse() { + } + + /** */ + public PartitionCountersNeighborcastResponse(IgniteUuid futId) { + this.futId = futId; + } + + /** + * @return Sending future id. + */ + public IgniteUuid futId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(PartitionCountersNeighborcastResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 166; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index cb682f6..240fbbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1681,6 +1681,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public void finalizeUpdateCountres() { + try { + CacheDataStore delegate0 = init0(true); + + if (delegate0 != null) + delegate0.finalizeUpdateCountres(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @Override public long nextUpdateCounter() { try { CacheDataStore delegate0 = init0(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0d3ba75..399359b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -91,7 +91,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionConcurrency; @@ -283,7 +282,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement private volatile IgniteInternalFuture rollbackFut; /** */ - private volatile TxCounters txCounters = new TxCounters(); + private volatile TxCounters txCounters; /** * Empty constructor required for {@link Externalizable}. http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 895a9d1..75e2087 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -47,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecove import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest; +import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.EnlistOperation; import org.apache.ignite.internal.processors.query.IgniteSQLException; @@ -257,6 +262,20 @@ public class IgniteTxHandler { processCheckPreparedTxResponse(nodeId, res); } }); + + ctx.io().addCacheHandler(0, PartitionCountersNeighborcastRequest.class, + new CI2<UUID, PartitionCountersNeighborcastRequest>() { + @Override public void apply(UUID nodeId, PartitionCountersNeighborcastRequest req) { + processPartitionCountersRequest(nodeId, req); + } + }); + + ctx.io().addCacheHandler(0, PartitionCountersNeighborcastResponse.class, + new CI2<UUID, PartitionCountersNeighborcastResponse>() { + @Override public void apply(UUID nodeId, PartitionCountersNeighborcastResponse res) { + processPartitionCountersResponse(nodeId, res); + } + }); } /** @@ -2152,4 +2171,110 @@ public class IgniteTxHandler { fut.onResult(nodeId, res); } + + /** + * @param nodeId Node id. + * @param req Request. + */ + private void processPartitionCountersRequest(UUID nodeId, PartitionCountersNeighborcastRequest req) { + applyPartitionsUpdatesCounters(req.updateCounters()); + + try { + ctx.io().send(nodeId, new PartitionCountersNeighborcastResponse(req.futId()), SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignored) { + if (txRecoveryMsgLog.isDebugEnabled()) + txRecoveryMsgLog.debug("Failed to send partition counters response, node left [node=" + nodeId + ']'); + } + catch (IgniteCheckedException e) { + U.error(txRecoveryMsgLog, "Failed to send partition counters response [node=" + nodeId + ']', e); + } + } + + /** + * @param nodeId Node id. + * @param res Response. + */ + private void processPartitionCountersResponse(UUID nodeId, PartitionCountersNeighborcastResponse res) { + PartitionCountersNeighborcastFuture fut = ((PartitionCountersNeighborcastFuture)ctx.mvcc().future(res.futId())); + + if (fut == null) { + log.warning("Failed to find future for partition counters response [futId=" + res.futId() + + ", node=" + nodeId + ']'); + + return; + } + + fut.onResult(nodeId); + } + + /** + * Applies partition counter updates for mvcc transactions. + * + * @param counters Counter values to be updated. + */ + public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) { + if (counters == null) + return; + + int cacheId = CU.UNDEFINED_CACHE_ID; + GridDhtPartitionTopology top = null; + + for (PartitionUpdateCountersMessage counter : counters) { + if (counter.cacheId() != cacheId) { + GridCacheContext ctx0 = ctx.cacheContext(cacheId = counter.cacheId()); + + assert ctx0.mvccEnabled(); + + top = ctx0.topology(); + } + + assert top != null; + + for (int i = 0; i < counter.size(); i++) { + GridDhtLocalPartition part = top.localPartition(counter.partition(i)); + + assert part != null; + + part.updateCounter(counter.initialCounter(i), counter.updatesCount(i)); + } + } + } + + /** + * @param tx Transaction. + * @param node Backup node. + * @return Partition counters for the given backup node. + */ + @Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode( + IgniteInternalTx tx, ClusterNode node) { + TxCounters txCntrs = tx.txCounters(false); + + if (txCntrs == null || F.isEmpty(txCntrs.updateCounters())) + return null; + + Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters(); + + List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size()); + + AffinityTopologyVersion top = tx.topologyVersionSnapshot(); + + for (PartitionUpdateCountersMessage partCntrs : updCntrs) { + GridCacheAffinityManager affinity = ctx.cacheContext(partCntrs.cacheId()).affinity(); + + PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size()); + + for (int i = 0; i < partCntrs.size(); i++) { + int part = partCntrs.partition(i); + + if (affinity.backupByPartition(node, part, top)) + resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i)); + } + + if (resCntrs.size() > 0) + res.add(resCntrs); + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 27b1522..0c2ca34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -36,12 +36,12 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture; @@ -59,19 +59,18 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; -import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; +import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -106,6 +105,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TX_STARTED; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; import static org.apache.ignite.internal.GridTopic.TOPIC_TX; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; @@ -254,18 +254,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } }; - cctx.gridEvents().addLocalEventListener( - new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; + cctx.gridEvents().addDiscoveryEventListener( + new DiscoveryEventListener() { + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - UUID nodeId = discoEvt.eventNode().id(); + UUID nodeId = evt.eventNode().id(); // Wait some time in case there are some unprocessed messages from failed node. - cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId)); + cctx.time().addTimeoutObject( + new NodeFailureTimeoutObject(evt.eventNode(), discoCache.mvccCoordinator())); if (txFinishSync != null) txFinishSync.onNodeLeft(nodeId); @@ -2026,7 +2024,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) { if (log.isInfoEnabled()) - log.info("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']'); + log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']'); if (!tx.markFinalizing(RECOVERY_FINISH)) { if (log.isInfoEnabled()) @@ -2046,10 +2044,28 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (commit) tx.commitAsync().listen(new CommitListener(tx)); + else if (tx.mvccSnapshot() != null && !tx.local()) + // remote (backup) mvcc transaction sends partition counters to other backup transaction + // in order to keep counters consistent + neighborcastPartitionCountersAndRollback(tx); else tx.rollbackAsync(); } + /** */ + private void neighborcastPartitionCountersAndRollback(IgniteInternalTx tx) { + TxCounters txCounters = tx.txCounters(false); + + if (txCounters == null || txCounters.updateCounters() == null) + tx.rollbackAsync(); + + PartitionCountersNeighborcastFuture fut = new PartitionCountersNeighborcastFuture(tx, cctx); + + fut.listen(fut0 -> tx.rollbackAsync()); + + fut.init(); + } + /** * Commits transaction in case when node started transaction failed, but all related * transactions were prepared (invalidates transaction if it is not fully prepared). @@ -2427,16 +2443,20 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { - /** Left or failed node. */ - private final UUID evtNodeId; + /** */ + private final ClusterNode node; + /** */ + private final MvccCoordinator mvccCrd; /** - * @param evtNodeId Event node ID. + * @param node Failed node. + * @param mvccCrd Mvcc coordinator at time of node failure. */ - private NodeFailureTimeoutObject(UUID evtNodeId) { + private NodeFailureTimeoutObject(ClusterNode node, MvccCoordinator mvccCrd) { super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT); - this.evtNodeId = evtNodeId; + this.node = node; + this.mvccCrd = mvccCrd; } /** @@ -2453,11 +2473,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } + UUID evtNodeId = node.id(); + try { if (log.isDebugEnabled()) log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() + ", failedNodeId=" + evtNodeId + ']'); + // Null means that recovery voting is not needed. + GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = node.isClient() && mvccCrd != null + ? new GridCompoundFuture<>() : null; + for (final IgniteInternalTx tx : activeTransactions()) { if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. @@ -2472,24 +2498,57 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); if (prepFut != null) { - prepFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - if (tx.state() == PREPARED) - commitIfPrepared(tx, Collections.singleton(evtNodeId)); - else if (tx.setRollbackOnly()) - tx.rollbackAsync(); - } + prepFut.listen(fut -> { + if (tx.state() == PREPARED) + commitIfPrepared(tx, Collections.singleton(evtNodeId)); + // If we could not mark tx as rollback, it means that transaction is being committed. + else if (tx.setRollbackOnly()) + tx.rollbackAsync(); }); } - else { - // If we could not mark tx as rollback, it means that transaction is being committed. - if (tx.setRollbackOnly()) - tx.rollbackAsync(); - } + // If we could not mark tx as rollback, it means that transaction is being committed. + else if (tx.setRollbackOnly()) + tx.rollbackAsync(); } } + + // Await only mvcc transactions initiated by failed client node. + if (allTxFinFut != null && tx.eventNodeId().equals(evtNodeId) + && tx.mvccSnapshot() != null) + allTxFinFut.add(tx.finishFuture()); } } + + if (allTxFinFut == null) + return; + + allTxFinFut.markInitialized(); + + // Send vote to mvcc coordinator when all recovering transactions have finished. + allTxFinFut.listen(fut -> { + // If mvcc coordinator issued snapshot for recovering transaction has failed during recovery, + // then there is no need to send messages to new coordinator. + try { + cctx.kernalContext().io().sendToGridTopic( + mvccCrd.nodeId(), + TOPIC_CACHE_COORDINATOR, + new MvccRecoveryFinishedMessage(evtNodeId), + SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isInfoEnabled()) + log.info("Mvcc coordinator issued snapshots for recovering transactions " + + "has left the cluster (will ignore) [locNodeId=" + cctx.localNodeId() + + ", failedNodeId=" + evtNodeId + + ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']'); + } + catch (IgniteCheckedException e) { + log.warning("Failed to notify mvcc coordinator that all recovering transactions were " + + "finished [locNodeId=" + cctx.localNodeId() + + ", failedNodeId=" + evtNodeId + + ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']', e); + } + }); } finally { cctx.kernalContext().gateway().readUnlock();
