ignite-3478
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af0c3bc2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af0c3bc2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af0c3bc2 Branch: refs/heads/ignite-3479 Commit: af0c3bc2190804cbb39618bacd6e8f262f12a11b Parents: 9ae39c4 Author: sboikov <[email protected]> Authored: Fri Sep 22 16:55:24 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 22 16:55:24 2017 +0300 ---------------------------------------------------------------------- .../near/GridNearTxFinishAndAckFuture.java | 2 +- .../near/GridNearTxFinishFuture.java | 2 +- .../mvcc/CacheCoordinatorsSharedManager.java | 48 ++++++++------------ .../cache/mvcc/CoordinatorTxAckRequest.java | 18 ++++---- 4 files changed, 31 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java index 8474ab7..7d03d46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java @@ -59,7 +59,7 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern assert crd != null; IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit( - crd, tx.nearXidVersion()); + crd, tx.mvccCoordinatorVersion()); ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> ackFut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 5f18e9b..347a694 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -425,7 +425,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit assert crd != null; - cctx.coordinators().ackTxRollback(crd, tx.nearXidVersion()); + cctx.coordinators().ackTxRollback(crd, tx.mvccCoordinatorVersion()); } try { http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index 0050659..641e6d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; @@ -82,7 +83,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager private final GridAtomicLong committedCntr = new GridAtomicLong(1L); /** */ - private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>(); /** */ private final Map<Long, Integer> activeQueries = new HashMap<>(); @@ -268,12 +269,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. - * @param txId Transaction ID. + * @param mvccVer Transaction version. * @return Acknowledge future. */ - public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, GridCacheVersion txId) { + public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, MvccCoordinatorVersion mvccVer) { assert crd != null; - assert txId != null; + assert mvccVer != null; WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true); @@ -282,7 +283,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager try { cctx.gridIO().sendToGridTopic(crd, MSG_TOPIC, - new CoordinatorTxAckRequest(fut.id, txId), + new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), MSG_POLICY); } catch (ClusterTopologyCheckedException e) { @@ -299,10 +300,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager /** * @param crd Coordinator. - * @param txId Transaction ID. + * @param mvccVer Transaction version. */ - public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) { - CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId); + public void ackTxRollback(ClusterNode crd, MvccCoordinatorVersion mvccVer) { + CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter()); msg.skipResponse(true); @@ -424,7 +425,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager * @param msg Message. */ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) { - onTxDone(msg.txId()); + onTxDone(msg.txCounter()); if (STAT_CNTRS) statCntrs[2].update(); @@ -482,10 +483,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.values()) + for (Long txVer : activeTxs.keySet()) res.addTx(txVer); - Object old = activeTxs.put(txId, nextCtr); + Object old = activeTxs.put(nextCtr, txId); assert old == null : txId; @@ -502,19 +503,19 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } /** - * @param txId Transaction ID. + * @param txCntr Counter assigned to transaction. */ - private void onTxDone(GridCacheVersion txId) { + private void onTxDone(Long txCntr) { GridFutureAdapter fut; // TODO IGNITE-3478. synchronized (this) { - Long cntr = activeTxs.remove(txId); + GridCacheVersion ver = activeTxs.remove(txCntr); - assert cntr != null; + assert ver != null; - committedCntr.setIfGreater(cntr); + committedCntr.setIfGreater(txCntr); - fut = waitTxFuts.remove(cntr); + fut = waitTxFuts.remove(txCntr); } if (fut != null) @@ -534,7 +535,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager Long trackCntr = mvccCntr; - for (Long txVer : activeTxs.values()) { + for (Long txVer : activeTxs.keySet()) { if (txVer < trackCntr) trackCntr = txVer; @@ -592,7 +593,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager for (int i = 0; i < txs.size(); i++) { long txId = txs.get(i); - if (hasActiveTx(txId)) { + if (activeTxs.containsKey(txId)) { GridFutureAdapter fut0 = waitTxFuts.get(txId); if (fut0 == null) { @@ -643,15 +644,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager } } - private boolean hasActiveTx(long txId) { - for (Long id : activeTxs.values()) { - if (id == txId) - return true; - } - - return false; - } - /** * @param topVer Topology version. * @return MVCC coordinator for given topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/af0c3bc2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java index 071a411..14cd6a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java @@ -38,7 +38,7 @@ public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { private long futId; /** */ - private GridCacheVersion txId; + private long txCntr; /** */ private byte flags; @@ -52,11 +52,11 @@ public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { /** * @param futId Future ID. - * @param txId Transaction ID. + * @param txCntr Counter assigned to transaction. */ - CoordinatorTxAckRequest(long futId, GridCacheVersion txId) { + CoordinatorTxAckRequest(long futId, long txCntr) { this.futId = futId; - this.txId = txId; + this.txCntr = txCntr; } /** {@inheritDoc} */ @@ -94,10 +94,10 @@ public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { } /** - * @return Transaction ID.s + * @return Counter assigned tp transaction. */ - public GridCacheVersion txId() { - return txId; + public long txCounter() { + return txCntr; } /** {@inheritDoc} */ @@ -125,7 +125,7 @@ public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { writer.incrementState(); case 2: - if (!writer.writeMessage("txId", txId)) + if (!writer.writeLong("txCntr", txCntr)) return false; writer.incrementState(); @@ -160,7 +160,7 @@ public class CoordinatorTxAckRequest implements MvccCoordinatorMessage { reader.incrementState(); case 2: - txId = reader.readMessage("txId"); + txCntr = reader.readLong("txCntr"); if (!reader.isLastRead()) return false;
