Repository: ignite Updated Branches: refs/heads/ignite-1.4-slow-server-debug b5bb23f82 -> a96761577
Debugging slowdowns Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9676157 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9676157 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9676157 Branch: refs/heads/ignite-1.4-slow-server-debug Commit: a967615774fdca01b4faa546c84cb3c16275b260 Parents: b5bb23f Author: Yakov Zhdanov <[email protected]> Authored: Tue Oct 20 19:32:24 2015 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Tue Oct 20 19:32:24 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 29 +-- .../distributed/dht/GridDhtLockFuture.java | 9 +- .../dht/GridDhtTransactionalCacheAdapter.java | 32 +-- .../distributed/dht/GridDhtTxLocalAdapter.java | 12 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 38 ++-- .../dht/colocated/GridDhtColocatedCache.java | 9 +- .../near/GridNearTransactionalCache.java | 9 +- .../near/GridNearTxFinishFuture.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 38 ++-- .../cache/transactions/IgniteTxManager.java | 203 +++++++++---------- .../util/GridBoundedConcurrentOrderedMap.java | 46 ++--- 11 files changed, 181 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 5385dec..8e95bde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1484,10 +1484,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @param log Log. * @param dhtMap Dht mappings. * @param nearMap Near mappings. - * @return {@code True} if mapped. * @throws GridCacheEntryRemovedException If reader for entry is removed. */ - public boolean dhtMap( + public void dhtMap( UUID nearNodeId, AffinityTopologyVersion topVer, GridDhtCacheEntry entry, @@ -1505,7 +1504,7 @@ public class GridCacheContext<K, V> implements Externalizable { Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node. - boolean ret = map(entry, dhtRemoteNodes, dhtMap); + map(entry, dhtRemoteNodes, dhtMap); Collection<ClusterNode> nearRemoteNodes = null; @@ -1526,7 +1525,7 @@ public class GridCacheContext<K, V> implements Externalizable { if (nearNodes != null && !nearNodes.isEmpty()) { nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes)); - ret |= map(entry, nearRemoteNodes, nearMap); + map(entry, nearRemoteNodes, nearMap); } } @@ -1536,8 +1535,6 @@ public class GridCacheContext<K, V> implements Externalizable { entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds); } - - return ret; } /** @@ -1545,10 +1542,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @param log Log. * @param dhtMap Dht mappings. * @param nearMap Near mappings. - * @return {@code True} if mapped. * @throws GridCacheEntryRemovedException If reader for entry is removed. */ - public boolean dhtMap( + public void dhtMap( GridDhtCacheEntry entry, GridCacheVersion explicitLockVer, IgniteLogger log, @@ -1567,27 +1563,20 @@ public class GridCacheContext<K, V> implements Externalizable { Collection<ClusterNode> nearNodes = cand.mappedNearNodes(); - boolean ret = map(entry, dhtNodes, dhtMap); + map(entry, dhtNodes, dhtMap); if (nearNodes != null && !nearNodes.isEmpty()) - ret |= map(entry, nearNodes, nearMap); - - return ret; + map(entry, nearNodes, nearMap); } - - return false; } /** * @param entry Entry. * @param nodes Nodes. * @param map Map. - * @return {@code True} if mapped. */ - private boolean map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes, + private void map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes, Map<ClusterNode, List<GridDhtCacheEntry>> map) { - boolean ret = false; - if (nodes != null) { for (ClusterNode n : nodes) { List<GridDhtCacheEntry> entries = map.get(n); @@ -1596,12 +1585,8 @@ public class GridCacheContext<K, V> implements Externalizable { map.put(n, entries = new LinkedList<>()); entries.add(entry); - - ret = true; } } - - return ret; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 2c16534..5dcbe4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -779,14 +779,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (log.isDebugEnabled()) log.debug("Mapping entry for DHT lock future: " + this); - boolean hasRmtNodes = false; - // Assign keys to primary nodes. for (GridDhtCacheEntry entry : entries) { try { while (true) { try { - hasRmtNodes = cctx.dhtMap( + cctx.dhtMap( nearNodeId, topVer, entry, @@ -820,9 +818,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> } } - if (tx != null) - tx.needsCompletedVersions(hasRmtNodes); - if (isDone()) { if (log.isDebugEnabled()) log.debug("Mapping won't proceed because future is done: " + this); @@ -1236,4 +1231,4 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> return S.toString(MiniFuture.class, this, "nodeId", node.id(), "super", super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 1a869e7..8ab16a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -17,15 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.io.Externalizable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -62,9 +53,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; -import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; @@ -76,6 +67,16 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; +import java.io.Externalizable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.UUID; + import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -1087,8 +1088,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // We have to add completed versions for cases when nearLocal and remote transactions // execute concurrently. - res.completedVersions(ctx.tm().committedVersions(req.version()), - ctx.tm().rolledbackVersions(req.version())); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version()); + + res.completedVersions(versPair.get1(), versPair.get2()); int i = 0; @@ -1513,8 +1515,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } } - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); // Backups. for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index dfeffb2..983b842 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -92,9 +92,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** */ protected boolean explicitLock; - /** */ - private boolean needsCompletedVers; - /** Versions of pending locks for entries of this tx. */ private Collection<GridCacheVersion> pendingVers; @@ -244,16 +241,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { */ protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err); - /** - * @param needsCompletedVers {@code True} if needs completed versions. - */ - public void needsCompletedVersions(boolean needsCompletedVers) { - this.needsCompletedVers |= needsCompletedVers; - } - /** {@inheritDoc} */ @Override public boolean needsCompletedVersions() { - return needsCompletedVers; + return nearOnOriginatingNode; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 89a435a..c4e4202 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -47,10 +47,10 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -677,7 +677,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridCacheVersion min = tx.minVersion(); - res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + if (tx.needsCompletedVersions()) { + IgnitePair<Collection<GridCacheVersion>> versPair = cctx.tm().versions(min); + + res.completedVersions(versPair.get1(), versPair.get2()); + } res.pending(localDhtPendingVersions(tx.writeEntries(), min)); @@ -942,20 +946,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>(); Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>(); - boolean hasRemoteNodes = false; - // Assign keys to primary nodes. if (!F.isEmpty(writes)) { for (IgniteTxEntry write : writes) - hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); + map(tx.entry(write.txKey()), futDhtMap, futNearMap); } if (!F.isEmpty(reads)) { for (IgniteTxEntry read : reads) - hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); + map(tx.entry(read.txKey()), futDhtMap, futNearMap); } - - tx.needsCompletedVersions(hasRemoteNodes); } if (isDone()) @@ -1146,15 +1146,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param entry Transaction entry. * @param futDhtMap DHT mapping. * @param futNearMap Near mapping. - * @return {@code True} if mapped. */ - private boolean map( + private void map( IgniteTxEntry entry, Map<UUID, GridDistributedTxMapping> futDhtMap, Map<UUID, GridDistributedTxMapping> futNearMap ) { if (entry.cached().isLocal()) - return false; + return; GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); @@ -1170,8 +1169,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.ttl(CU.toTtl(expiry.getExpiryForAccess())); } - boolean ret; - while (true) { try { Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); @@ -1195,10 +1192,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter log.debug("Entry has no near readers: " + entry); // Exclude local node. - ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); + map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); // Exclude DHT nodes. - ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); + map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); break; } @@ -1208,8 +1205,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.cached(cached); } } - - return ret; } /** @@ -1217,16 +1212,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param nodes Nodes. * @param globalMap Map. * @param locMap Exclude map. - * @return {@code True} if mapped. */ - private boolean map( + private void map( IgniteTxEntry entry, Iterable<ClusterNode> nodes, Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap ) { - boolean ret = false; - if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping global = globalMap.get(n.id()); @@ -1255,12 +1247,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter locMap.put(n.id(), loc = new GridDistributedTxMapping(n)); loc.add(entry); - - ret = true; } } - - return ret; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index c8425e9..ae077d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -615,8 +616,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (map == null || map.isEmpty()) return; - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); @@ -892,4 +895,4 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public String toString() { return S.toString(GridDhtColocatedCache.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index e70c864..75ddef9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -710,8 +711,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (map == null || map.isEmpty()) return; - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); @@ -740,4 +743,4 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Override public String toString() { return S.toString(GridNearTransactionalCache.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 5e21450..54652bc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -407,7 +407,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu "(backup has left grid): " + tx.xidVersion(), cause)); } else if (backup.isLocal()) { - boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion()); + boolean committed = !cctx.tm().addRolledbackTx(tx); readyNearMappingFromBackup(mapping); http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c2cc629..b4f2add 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -871,7 +871,7 @@ public class IgniteTxHandler { log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); if (req.checkCommitted()) { - sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version())); + sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version())); return; } @@ -891,8 +891,13 @@ public class IgniteTxHandler { if (req.replyRequired()) { IgniteInternalFuture completeFut; - IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); - IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); + IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? + null : dhtTx.done() ? + null : dhtTx.finishFuture(); + + IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? + null : nearTx.done() ? + null : nearTx.finishFuture(); if (dhtFin != null && nearFin != null) { GridCompoundFuture fut = new GridCompoundFuture(); @@ -923,24 +928,6 @@ public class IgniteTxHandler { } /** - * Checks whether DHT remote transaction with given version has been committed. If not, will add version - * to rollback version set so that late response will not falsely commit this transaction. - * - * @param writeVer Write version to check. - * @return {@code True} if transaction has been committed, {@code false} otherwise. - */ - public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) { - assert writeVer != null; - - boolean committed = true; - - if (ctx.tm().addRolledbackTx(writeVer)) - committed = false; - - return committed; - } - - /** * @param nodeId Node ID. * @param tx Transaction. * @param req Request. @@ -948,7 +935,8 @@ public class IgniteTxHandler { protected void finish( UUID nodeId, IgniteTxRemoteEx tx, - GridDhtTxFinishRequest req) { + GridDhtTxFinishRequest req + ) { // We don't allow explicit locks for transactions and // therefore immediately return if transaction is null. // However, we may decide to relax this restriction in @@ -956,9 +944,9 @@ public class IgniteTxHandler { if (tx == null) { if (req.commit()) // Must be some long time duplicate, but we add it anyway. - ctx.tm().addCommittedTx(req.version(), null); + ctx.tm().addCommittedTx(tx, req.version(), null); else - ctx.tm().addRolledbackTx(req.version()); + ctx.tm().addRolledbackTx(tx, req.version()); if (log.isDebugEnabled()) log.debug("Received finish request for non-existing transaction (added to completed set) " + @@ -1382,4 +1370,4 @@ public class IgniteTxHandler { fut.onResult(nodeId, res); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 2082ba5..c1a8ae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; @@ -60,10 +61,12 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridFunc; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -78,6 +81,7 @@ import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; @@ -141,10 +145,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { new ConcurrentSkipListMap<>(); /** Committed local transactions. */ - private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers = + private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted = new GridBoundedConcurrentOrderedMap<>( Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); + /** Committed local transactions. */ + private final GridBoundedConcurrentLinkedHashMap<GridCacheVersion, Boolean> completedVersHashMap = + new GridBoundedConcurrentLinkedHashMap<>( + Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), + Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), + 0.75f, + Math.max(Runtime.getRuntime().availableProcessors(), 64), + ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q); + /** Transaction finish synchronizer. */ private GridCacheTxFinishSync txFinishSync; @@ -343,7 +356,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { X.println(">>> prepareQueueSize: " + prepareQ.size()); X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + ", firstVer=" + startVerEntry + ']'); - X.println(">>> completedVersSize: " + completedVers.size()); + X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); + X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } /** @@ -385,7 +399,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Committed versions size. */ public int completedVersionsSize() { - return completedVers.size(); + return completedVersHashMap.size(); } /** @@ -395,7 +409,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * {@code false} otherwise. */ public boolean isCompleted(IgniteInternalTx tx) { - return completedVers.containsKey(tx.xidVersion()); + return completedVersHashMap.containsKey(tx.xidVersion()); } /** @@ -1015,65 +1029,59 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param map Collection to copy. - * @param expVal Values to copy. - * @return Copy of the collection. + * @param min Minimum version. + * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty, + * but also never {@code null}. */ - private Collection<GridCacheVersion> copyOf(Map<GridCacheVersion, Boolean> map, boolean expVal) { - Collection<GridCacheVersion> l = new ArrayList<>(); + public IgnitePair<Collection<GridCacheVersion>> versions(GridCacheVersion min) { + Collection<GridCacheVersion> committed = null; + Collection<GridCacheVersion> rolledback = null; - for (Map.Entry<GridCacheVersion, Boolean> e : map.entrySet()) { - if (e.getValue() == expVal) - l.add(e.getKey()); - } + for (Map.Entry<GridCacheVersion, Boolean> e : completedVersSorted.tailMap(min, true).entrySet()) { + if (e.getValue()) { + if (committed == null) + committed = new ArrayList<>(); - return l; - } + committed.add(e.getKey()); + } + else { + if (rolledback == null) + rolledback = new ArrayList<>(); - /** - * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). - */ - public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) { - ConcurrentNavigableMap<GridCacheVersion, Boolean> tail - = completedVers.tailMap(min, true); + rolledback.add(e.getKey()); + } + } - return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true); + return F.pair( + committed == null ? Collections.<GridCacheVersion>emptyList() : committed, + rolledback == null ? Collections.<GridCacheVersion>emptyList() : rolledback); } /** - * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). + * @return Collection of active transactions. */ - public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) { - ConcurrentNavigableMap<GridCacheVersion, Boolean> tail - = completedVers.tailMap(min, true); - - return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false); + public Collection<IgniteInternalTx> activeTransactions() { + return F.concat(false, idMap.values(), nearIdMap.values()); } /** * @param tx Tx to remove. */ public void removeCommittedTx(IgniteInternalTx tx) { - completedVers.remove(tx.xidVersion(), true); + completedVersHashMap.remove(tx.xidVersion(), true); + + if (tx.needsCompletedVersions()) + completedVersSorted.remove(tx.xidVersion(), true); } /** * @param tx Committed transaction. - * @return If transaction was not already present in committed set. */ - public boolean addCommittedTx(IgniteInternalTx tx) { - boolean res = addCommittedTx(tx.xidVersion(), tx.nearXidVersion()); + public void addCommittedTx(IgniteInternalTx tx) { + addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion()); if (!tx.local() && !tx.near() && tx.onePhaseCommit()) - addCommittedTx(tx.nearXidVersion(), null); - - return res; + addCommittedTx(tx, tx.nearXidVersion(), null); } /** @@ -1081,63 +1089,52 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return If transaction was not already present in committed set. */ public boolean addRolledbackTx(IgniteInternalTx tx) { - return addRolledbackTx(tx.xidVersion()); - } - - /** - * @return Collection of active transactions. - */ - public Collection<IgniteInternalTx> activeTransactions() { - return F.concat(false, idMap.values(), nearIdMap.values()); + return addRolledbackTx(tx, tx.xidVersion()); } /** + * @param tx Tx. * @param xidVer Completed transaction version. * @param nearXidVer Optional near transaction ID. * @return If transaction was not already present in completed set. */ - public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { -// if (nearXidVer != null) -// xidVer = new CommittedVersion(xidVer, nearXidVer); -// -// Boolean committed = completedVers.putIfAbsent(xidVer, true); -// -// if (committed == null || committed) { -// if (log.isDebugEnabled()) -// log.debug("Added transaction to committed version set: " + xidVer); -// -// return true; -// } -// else { -// if (log.isDebugEnabled()) -// log.debug("Transaction is already present in rolled back version set: " + xidVer); -// -// return false; -// } - return true; + public boolean addCommittedTx( + IgniteInternalTx tx, + GridCacheVersion xidVer, + @Nullable GridCacheVersion nearXidVer + ) { + if (nearXidVer != null) + xidVer = new CommittedVersion(xidVer, nearXidVer); + + Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true); + + if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { + Boolean b = completedVersSorted.putIfAbsent(xidVer, true); + + assert b == null; + } + + return committed0 == null || committed0; } /** + * @param tx Tx. * @param xidVer Completed transaction version. * @return If transaction was not already present in completed set. */ - public boolean addRolledbackTx(GridCacheVersion xidVer) { + public boolean addRolledbackTx( + IgniteInternalTx tx, + GridCacheVersion xidVer + ) { + Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false); - return true; -// Boolean committed = completedVers.putIfAbsent(xidVer, false); -// -// if (committed == null || !committed) { -// if (log.isDebugEnabled()) -// log.debug("Added transaction to rolled back version set: " + xidVer); -// -// return true; -// } -// else { -// if (log.isDebugEnabled()) -// log.debug("Transaction is already present in committed version set: " + xidVer); -// -// return false; -// } + if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { + Boolean b = completedVersSorted.putIfAbsent(xidVer, false); + + assert b == null; + } + + return committed0 == null || !committed0; } /** @@ -1151,7 +1148,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert min != null; - tx.completedVersions(min, committedVersions(min), rolledbackVersions(min)); + IgnitePair<Collection<GridCacheVersion>> versPair = versions(min); + + tx.completedVersions(min, versPair.get1(), versPair.get2()); } } @@ -1264,19 +1263,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * so we don't do it here. */ -// Boolean committed = completedVers.get(tx.xidVersion()); -// -// // 1. Make sure that committed version has been recorded. -// if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { -// uncommitTx(tx); -// -// GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey(); -// GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey(); -// -// throw new IgniteException("Missing commit version (consider increasing " + -// IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + -// first + ", lastVer=" + last + ", tx=" + tx.xid() + ']'); -// } + Boolean committed = completedVersHashMap.get(tx.xidVersion()); + + // 1. Make sure that committed version has been recorded. + if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { + uncommitTx(tx); + + throw new IgniteException("Missing commit version (consider increasing " + + IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + + ", tx=" + tx.getClass().getSimpleName() + ']'); + } ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); @@ -1826,12 +1822,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return resFut; } - Boolean committed = null; - - for (Map.Entry<GridCacheVersion, Boolean> entry : completedVers.entrySet()) { - if (entry.getValue() == null) - continue; + boolean committed = false; + for (Map.Entry<GridCacheVersion, Boolean> entry : completedVersHashMap.entrySet()) { if (entry.getKey() instanceof CommittedVersion) { CommittedVersion comm = (CommittedVersion)entry.getKey(); @@ -1846,7 +1839,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Near transaction committed: " + committed); - resFut.onDone(committed != null && committed); + resFut.onDone(committed); return resFut; } @@ -1950,7 +1943,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Not all transactions were found. Need to scan committed versions to check // if transaction was already committed. - for (Map.Entry<GridCacheVersion, Boolean> e : completedVers.entrySet()) { + for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) { if (!e.getValue()) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/a9676157/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java index b091652..c922e56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.util; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.jetbrains.annotations.Nullable; + import java.util.Comparator; import java.util.Map; -import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.jetbrains.annotations.Nullable; /** * Concurrent ordered map that automatically manages its maximum size. @@ -46,7 +46,7 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap private static final long serialVersionUID = 0L; /** Element count. */ - private final AtomicInteger cnt = new AtomicInteger(0); + private final AtomicInteger cnt = new AtomicInteger(); /** Maximum size. */ private int max; @@ -168,35 +168,21 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap private void onPut() { cnt.incrementAndGet(); - int c; - - while ((c = cnt.get()) > max) { - // Decrement count. - if (cnt.compareAndSet(c, c - 1)) { - try { - K key = firstEntry().getKey(); + IgniteBiInClosure<K, V> lsnr = this.lsnr; - V val; + int delta = cnt.get() - max; - // Make sure that an element is removed. - while ((val = super.remove(firstEntry().getKey())) == null) { - // No-op. - } + for (int i = 0; i < delta && cnt.get() > max; i++) { + Entry<K, V> e = pollFirstEntry(); - assert val != null; + if (e == null) + return; - IgniteBiInClosure<K, V> lsnr = this.lsnr; - - // Listener notification. - if (lsnr != null) - lsnr.apply(key, val); - } - catch (NoSuchElementException ignored) { - cnt.incrementAndGet(); + cnt.decrementAndGet(); - return; - } - } + // Listener notification. + if (lsnr != null) + lsnr.apply(e.getKey(), e.getValue()); } } @@ -251,4 +237,4 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap return rmvd; } -} \ No newline at end of file +}
