ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/baeb2036 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/baeb2036 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/baeb2036 Branch: refs/heads/ignite-4768-1 Commit: baeb2036e4f29ef1336cbfe5d8f8fcac012cbfb4 Parents: df80606 Author: sboikov <[email protected]> Authored: Mon Mar 13 15:31:52 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 13 18:45:44 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 1 + .../processors/cache/GridCacheIoManager.java | 4 +- .../processors/cache/GridCacheMvccManager.java | 7 ++ .../distributed/GridCacheTxFinishSync.java | 6 +- .../GridCacheTxRecoveryResponse.java | 2 + .../GridDistributedTxFinishRequest.java | 8 ++ .../GridDistributedTxPrepareRequest.java | 13 ++- .../GridDistributedTxPrepareResponse.java | 1 + .../distributed/dht/GridDhtTxFinishFuture.java | 7 +- .../cache/distributed/dht/GridDhtTxLocal.java | 64 ++---------- .../distributed/dht/GridDhtTxLocalAdapter.java | 65 ++++++++++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 37 +++---- ...arOptimisticSerializableTxPrepareFuture.java | 100 ++++++++++++++++--- .../GridNearPessimisticTxPrepareFuture.java | 12 +-- .../near/GridNearTxFinishFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 10 +- .../cache/transactions/IgniteTxHandler.java | 19 ++-- 17 files changed, 215 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 2a6706e..0ee02fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1155,6 +1155,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { String msg = NL + "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + + "Futures " + ctx.cache().context().mvcc().activeFuturesCount()+ NL + " ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 2c255a5..e433825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { unmarshall(nodeId, cacheMsg); - if (!cacheMsg.partitionExchangeMessage()) - log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']'); +// if (!cacheMsg.partitionExchangeMessage()) +// log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']'); if (cacheMsg.classError() != null) processFailedMessage(nodeId, cacheMsg, c); http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 4ec13fc..7d14fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -312,6 +312,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * TODO IGNITE-4768. + */ + public int activeFuturesCount() { + return mvccFuts.size(); + } + + /** * @param leftNodeId Left node ID. * @param topVer Topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java index 1e323d0..64d3122 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java @@ -137,7 +137,7 @@ public class GridCacheTxFinishSync<K, V> { /** * @param nodeId Node ID request being sent to. */ - public void onSend(UUID nodeId) { + void onSend(UUID nodeId) { TxFinishSync sync = nodeMap.get(nodeId); if (sync == null) { @@ -169,7 +169,7 @@ public class GridCacheTxFinishSync<K, V> { * @param nodeId Node ID to wait ack from. * @return {@code null} if ack has been received or future that will be completed when ack is received. */ - public IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) { + IgniteInternalFuture<?> awaitAckAsync(UUID nodeId) { TxFinishSync sync = nodeMap.get(nodeId); if (sync == null) @@ -191,7 +191,7 @@ public class GridCacheTxFinishSync<K, V> { /** * @param nodeId Node ID response received from. */ - public void onReceive(UUID nodeId) { + void onReceive(UUID nodeId) { TxFinishSync sync = nodeMap.get(nodeId); if (sync != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java index c087a3d..b5bb1b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -48,6 +49,7 @@ public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage impl /** Transient TX state. */ @GridDirectTransient + @GridToStringExclude private IgniteTxState txState; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index c794f96..03d16e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -110,6 +111,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** Transient TX state. */ @GridDirectTransient + @GridToStringExclude private IgniteTxState txState; /** @@ -571,7 +573,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** {@inheritDoc} */ @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (dhtReplyNear()) + appendFlag(flags, "dht2near"); + return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this, + "flags", flags.toString(), "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index c013c1a..1f06696 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -151,6 +151,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** Transient TX state. */ @GridDirectTransient + @GridToStringExclude private IgniteTxState txState; /** */ @@ -699,15 +700,17 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage StringBuilder flags = new StringBuilder(); if (needReturnValue()) - flags.append("retVal"); + appendFlag(flags, "retVal"); if (isInvalidate()) - flags.append("invalidate"); + appendFlag(flags, "invalidate"); if (onePhaseCommit()) - flags.append("onePhase"); + appendFlag(flags, "onePhase"); if (last()) - flags.append("last"); + appendFlag(flags, "last"); if (system()) - flags.append("sys"); + appendFlag(flags, "sys"); + if (dhtReplyNear()) + appendFlag(flags, "dht2near"); return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this, "flags", flags.toString(), http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index 53a1391..aaa8db5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -49,6 +49,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage /** Transient TX state. */ @GridDirectTransient + @GridToStringExclude private IgniteTxState txState; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 72a9b73..eb5d58f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -100,6 +100,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter tx, boolean commit) { super(F.<IgniteInternalTx>identityReducer(tx)); + assert tx.nearFinishFutureId() != null : tx; + assert tx.nearFinishMiniId() != 0 : tx; + this.cctx = cctx; this.tx = tx; this.commit = commit; @@ -424,8 +427,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), - dhtReplyNear ? tx.nearFutureId() : futId, - dhtReplyNear ? 0 : fut.futureId(), + dhtReplyNear ? tx.nearFinishFutureId() : futId, + dhtReplyNear ? tx.nearFinishMiniId() : fut.futureId(), tx.topologyVersion(), tx.xidVersion(), tx.commitVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index bff69bc..7ddf415 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -68,18 +68,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** */ private UUID nearNodeId; - /** Near future ID. */ - private IgniteUuid nearFutId; - - /** Near future ID. */ - private int nearMiniId; - - /** Near future ID. */ - private IgniteUuid nearFinFutId; - - /** Near future ID. */ - private int nearFinMiniId; - /** Near XID. */ private GridCacheVersion nearXidVer; @@ -164,8 +152,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa this.nearNodeId = nearNodeId; this.nearXidVer = nearXidVer; - this.nearFutId = nearFutId; - this.nearMiniId = nearMiniId; + this.nearPrepFutId = nearFutId; + this.nearPrepMiniId = nearMiniId; this.txNodes = txNodes; threadId = nearThreadId; @@ -219,18 +207,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override protected IgniteUuid nearFutureId() { - return nearFutId; - } - - /** - * @param nearFutId Near future ID. - */ - public void nearFutureId(IgniteUuid nearFutId) { - this.nearFutId = nearFutId; - } - - /** {@inheritDoc} */ @Override public boolean dht() { return true; } @@ -240,27 +216,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa return cacheCtx.isDht() && isNearEnabled(cacheCtx) && !cctx.localNodeId().equals(nearNodeId()); } - /** - * @return Near future ID. - */ - public IgniteUuid nearFinishFutureId() { - return nearFinFutId; - } - - /** - * @param nearFinFutId Near future ID. - */ - public void nearFinishFutureId(IgniteUuid nearFinFutId) { - this.nearFinFutId = nearFinFutId; - } - - /** - * @param nearFinMiniId Near future mini ID. - */ - public void nearFinishMiniId(int nearFinMiniId) { - this.nearFinMiniId = nearFinMiniId; - } - /** {@inheritDoc} */ @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached, IgniteTxEntry entry, AffinityTopologyVersion topVer) { @@ -301,7 +256,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa null, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), 0, - nearMiniId, null, true); } @@ -317,7 +271,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx, this, timeout, - nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, needReturnValue()))) { @@ -377,7 +330,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param writes Write entries. * @param verMap Version map. * @param msgId Message ID. - * @param nearMiniId Near mini future ID. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this is last prepare request. * @return Future that will be completed when locks are acquired. @@ -387,7 +339,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Nullable Collection<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, long msgId, - int nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last ) { @@ -404,14 +355,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx, this, timeout, - nearMiniId, verMap, last, needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut; - assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; + assert f.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing future " + + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut=" + f + ']'; if (timeout == -1) f.onError(timeoutException()); @@ -420,8 +370,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else { - assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; + assert fut.nearMiniId() == nearPrepMiniId : "Wrong near mini id on existing future " + + "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearPrepMiniId + ", fut=" + fut + ']'; // Prepare was called explicitly. return chainOnePhasePrepare(fut); @@ -619,7 +569,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']'; - assert nearMiniId != 0; + assert nearPrepMiniId != 0; return super.finish(commit); } http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 67e1993..81b5208 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -97,6 +97,18 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** Nodes where transactions were started on lock step. */ private Set<ClusterNode> lockTxNodes; + /** Near future ID. */ + protected IgniteUuid nearPrepFutId; + + /** Prepare future mini ID. */ + protected int nearPrepMiniId; + + /** Near future ID. */ + protected IgniteUuid nearFinFutId; + + /** Prepare future mini ID. */ + protected int nearFinMiniId; + /** * Empty constructor required for {@link Externalizable}. */ @@ -159,9 +171,55 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * @return Near future ID. + */ + final IgniteUuid nearPrepareFutureId() { + return nearPrepFutId; + } + + /** + * @param futId Near future ID. + * @param miniId Near mini future ID. + */ + public final void nearPrepareFutureId(IgniteUuid futId, int miniId) { + this.nearPrepFutId = futId; + this.nearPrepMiniId = miniId; + } + + /** + * @return Near prepare mini future ID. + */ + final int nearPrepareMiniId() { + return nearPrepMiniId; + } + + /** + * @return Near future ID. + */ + final IgniteUuid nearFinishFutureId() { + return nearFinFutId; + } + + /** + * @param futId Near future ID. + * @param miniId Near mini future ID. + */ + public final void nearFinishFutureId(IgniteUuid futId, int miniId) { + nearFinFutId = futId; + nearFinMiniId = miniId; + } + + /** + * @return Near future mini ID. + */ + public final int nearFinishMiniId() { + return nearFinMiniId; + } + + /** * @param node Node. */ - public void addLockTransactionNode(ClusterNode node) { + void addLockTransactionNode(ClusterNode node) { assert node != null; assert !node.isLocal(); @@ -216,11 +274,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { protected abstract UUID nearNodeId(); /** - * @return Near future ID. - */ - protected abstract IgniteUuid nearFutureId(); - - /** * Adds reader to cached entry. * * @param msgId Message ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index c8e06af..7c69021 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -175,9 +175,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Trackable flag. */ private boolean trackable = true; - /** Near mini future id. */ - private int nearMiniId; - /** DHT versions map. */ private Map<IgniteTxKey, GridCacheVersion> dhtVerMap; @@ -213,7 +210,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param cctx Context. * @param tx Transaction. * @param timeout Timeout. - * @param nearMiniId Near mini future id. * @param dhtVerMap DHT versions map. * @param last {@code True} if this is last prepare operation for node. * @param retVal Return value flag. @@ -222,13 +218,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridCacheSharedContext cctx, final GridDhtTxLocalAdapter tx, long timeout, - int nearMiniId, Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, boolean retVal ) { super(REDUCER); + assert tx.nearPrepareFutureId() != null; + assert tx.nearPrepareMiniId() != 0; + this.cctx = cctx; this.tx = tx; this.dhtVerMap = dhtVerMap; @@ -236,8 +234,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter futId = IgniteUuid.randomUuid(); - this.nearMiniId = nearMiniId; - if (log == null) { msgLog = cctx.txPrepareMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtTxPrepareFuture.class); @@ -263,7 +259,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @return Near mini future id. */ int nearMiniId() { - return nearMiniId; + return tx.nearPrepareMiniId(); } /** {@inheritDoc} */ @@ -860,8 +856,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( -1, tx.nearXidVersion(), - tx.colocated() ? tx.xid() : tx.nearFutureId(), - nearMiniId, + tx.nearPrepareFutureId(), + nearMiniId(), tx.xidVersion(), tx.writeVersion(), ret, @@ -1223,17 +1219,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter final boolean dhtReplyNear = tx.dhtReplyNear(); - Collection<UUID> backupNodes; - IgniteUuid nearFutId; - - if (dhtReplyNear) { - backupNodes = tx.transactionNodes().get(cctx.localNodeId()); - nearFutId = tx.colocated() ? tx.xid() : tx.nearFutureId(); - } - else { - backupNodes = null; - nearFutId = null; - } + Collection<UUID> backupNodes = dhtReplyNear ? tx.transactionNodes().get(cctx.localNodeId()) : null; // Assign keys to primary nodes. if (!F.isEmpty(writes)) { @@ -1285,8 +1271,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert txNodes != null; GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - dhtReplyNear ? nearFutId : futId, - dhtReplyNear ? nearMiniId : fut.futureId(), + dhtReplyNear ? tx.nearPrepareFutureId() : futId, + dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(), tx.topologyVersion(), tx, timeout, @@ -1403,8 +1389,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter add(fut); // Append new future. GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - dhtReplyNear ? nearFutId : futId, - dhtReplyNear ? nearMiniId : fut.futureId(), + dhtReplyNear ? tx.nearPrepareFutureId() : futId, + dhtReplyNear ? tx.nearPrepareMiniId() : fut.futureId(), tx.topologyVersion(), tx, timeout, @@ -1503,6 +1489,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param entry Transaction entry. + * @param backupNodes Node IDs collection if tx was mapped on near node. */ private void map(IgniteTxEntry entry, @Nullable Collection<UUID> backupNodes) { if (entry.cached().isLocal()) http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index cd0e7fd..6b2daf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; @@ -36,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -200,13 +203,18 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture mini = miniFuture(res.miniId()); if (mini != null) - mini.onResult(res); + mini.onPrimaryResponse(res); } } /** {@inheritDoc} */ @Override public void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) { - assert false; // TODO IGNITE-4768. + assert res.nearNodeResponse() : res; + + MiniFuture mini = miniFuture(res.miniId()); + + if (mini != null) + mini.onDhtResponse(nodeId, res); } /** {@inheritDoc} */ @@ -347,11 +355,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); + boolean dhtReplyNear = true; + for (IgniteTxEntry write : writes) - map(write, topVer, mappings, txMapping, remap, topLocked); + dhtReplyNear = map(write, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear); for (IgniteTxEntry read : reads) - map(read, topVer, mappings, txMapping, remap, topLocked); + dhtReplyNear = map(read, topVer, mappings, txMapping, remap, topLocked, dhtReplyNear); if (keyLockFut != null) keyLockFut.onAllKeysAdded(); @@ -371,10 +381,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim checkOnePhase(txMapping); + if (tx.onePhaseCommit()) + dhtReplyNear = false; + + tx.dhtReplyNear(dhtReplyNear); + for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - add(new MiniFuture(this, m, ++miniId)); + Set<UUID> dhtNodes; + + if (dhtReplyNear) { + dhtNodes = new HashSet<>(txMapping.transactionNodes().get(m.primary().id())); + + assert !dhtNodes.isEmpty(); + } + else + dhtNodes = null; + + add(new MiniFuture(this, m, ++miniId, dhtNodes)); } Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); @@ -389,7 +414,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture fut = (MiniFuture)fut0; - IgniteCheckedException err = prepare(fut, txMapping); + IgniteCheckedException err = prepare(fut, txMapping, dhtReplyNear); if (err != null) { while (it.hasNext()) { @@ -425,7 +450,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param fut Mini future. * @return Prepare error if any. */ - @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) { + @Nullable private IgniteCheckedException prepare(final MiniFuture fut, + GridDhtTxMapping txMapping, + boolean dhtReplyNear) { GridDistributedTxMapping m = fut.mapping(); final ClusterNode primary = m.primary(); @@ -449,7 +476,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim m.writes(), m.near(), txMapping.transactionNodes(), - false, + dhtReplyNear, m.last(), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), @@ -490,7 +517,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { try { - fut.onResult(prepFut.get()); + fut.onPrimaryResponse(prepFut.get()); } catch (IgniteCheckedException e) { fut.onResult(e); @@ -526,19 +553,30 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param remap Remap flag. * @param topLocked Topology locked flag. */ - private void map( + private boolean map( IgniteTxEntry entry, AffinityTopologyVersion topVer, Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping, GridDhtTxMapping txMapping, boolean remap, - boolean topLocked + boolean topLocked, + boolean dhtReplyNear ) { GridCacheContext cacheCtx = entry.context(); - List<ClusterNode> nodes = cacheCtx.isLocal() ? - cacheCtx.affinity().nodesByKey(entry.key(), topVer) : - cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); + List<ClusterNode> nodes; + + if (!cacheCtx.isLocal()) { + GridDhtPartitionTopology top = cacheCtx.topology(); + + nodes = top.nodes(cacheCtx.affinity().partition(entry.key()), topVer); + + if (dhtReplyNear && + (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) || nodes.size() == 1)) + dhtReplyNear = false; + } + else + nodes = cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); txMapping.addMapping(nodes); @@ -620,6 +658,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } } + + return dhtReplyNear; } /** {@inheritDoc} */ @@ -711,15 +751,24 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim @SuppressWarnings("UnusedDeclaration") private volatile int rcvRes; + /** */ + private final Set<UUID> dhtNodes; + /** * @param parent Parent future. * @param m Mapping. * @param futId Mini future ID. */ - MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) { + MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, + GridDistributedTxMapping m, + int futId, + Set<UUID> dhtNodes) { + assert dhtNodes == null || !dhtNodes.isEmpty(); + this.parent = parent; this.m = m; this.futId = futId; + this.dhtNodes = dhtNodes; } /** @@ -779,10 +828,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } /** + * @param nodeId Node ID. + * @param res Response. + */ + void onDhtResponse(UUID nodeId, GridDhtTxPrepareResponse res) { + assert dhtNodes != null; + + boolean done; + + synchronized (dhtNodes) { + done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty(); + } + + if (done) + onDone(); + } + + /** * @param res Result callback. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - void onResult(final GridNearTxPrepareResponse res) { + void onPrimaryResponse(final GridNearTxPrepareResponse res) { if (isDone()) return; @@ -885,6 +951,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim onDone(res); } else { + assert dhtNodes == null; + parent.processPrimaryPrepareResponse(m, res); // Finish this mini future (need result only on client node). http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 1d3eaec..857f237 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -104,7 +104,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (f != null) { assert f.primary().id().equals(nodeId); - f.onPrimaryResult(res); + f.onPrimaryResponse(res); } else { if (msgLog.isDebugEnabled()) { @@ -223,12 +223,10 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (!cacheCtx.isLocal()) { GridDhtPartitionTopology top = cacheCtx.topology(); - if (dhtReplyNear && (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer))) - dhtReplyNear = false; - nodes = top.nodes(cacheCtx.affinity().partition(txEntry.key()), topVer); - if (nodes.size() == 1) + if (dhtReplyNear && + (!top.rebalanceFinished(topVer) || cctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) || nodes.size() == 1)) dhtReplyNear = false; } else @@ -319,7 +317,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { try { - fut.onPrimaryResult(prepFut.get()); + fut.onPrimaryResponse(prepFut.get()); } catch (IgniteCheckedException e) { fut.onError(e); @@ -466,7 +464,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** * @param res Response. */ - void onPrimaryResult(GridNearTxPrepareResponse res) { + void onPrimaryResponse(GridNearTxPrepareResponse res) { if (res.error() != null) onError(res.error()); else { http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 44455ca..56b7284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -1029,8 +1029,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu done = dhtNodes.remove(nodeId) && dhtNodes.isEmpty(); } - if (done) + if (done) { + cctx.tm().onFinishedRemote(primary().id(), tx.threadId()); + onDone(tx); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8ed749c..80b93e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -214,13 +214,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override protected IgniteUuid nearFutureId() { - assert false : "nearFutureId should not be called for colocated transactions."; - - return null; - } - - /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> addReader( long msgId, GridDhtCacheEntry cached, @@ -1002,7 +995,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx, this, timeout, - 0, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), last, needReturnValue() && implicit()); @@ -1065,7 +1057,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { // Do not create finish future if there are no remote nodes. if (F.isEmpty(dhtMap) && F.isEmpty(nearMap)) { if (prep != null) - return (IgniteInternalFuture<IgniteInternalTx>)(IgniteInternalFuture)prep; + return (IgniteInternalFuture<IgniteInternalTx>)prep; return new GridFinishedFuture<IgniteInternalTx>(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/baeb2036/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 71b847a..aad0e34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -356,10 +356,6 @@ public class IgniteTxHandler { if (tx == null) U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version() + ", mappedVer=" + mappedVer + ']'); - else { - if (req.concurrency() == PESSIMISTIC) - tx.nearFutureId(req.futureId()); - } } else { GridDhtPartitionTopology top = null; @@ -470,6 +466,7 @@ public class IgniteTxHandler { tx.explicitLock(true); tx.transactionNodes(req.transactionNodes()); + tx.nearPrepareFutureId(req.futureId(), req.miniId()); tx.dhtReplyNear(req.dhtReplyNear()); @@ -490,7 +487,6 @@ public class IgniteTxHandler { req.writes(), req.dhtVersions(), req.messageId(), - req.miniId(), req.transactionNodes(), req.last()); @@ -771,8 +767,10 @@ public class IgniteTxHandler { * @param req Finish request. * @return Finish future. */ - private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal locTx, - GridNearTxFinishRequest req) { + private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, + @Nullable GridNearTxLocal locTx, + GridNearTxFinishRequest req) + { GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); GridDhtTxLocal tx = null; @@ -853,6 +851,7 @@ public class IgniteTxHandler { assert req.syncMode() != null : req; tx.syncMode(req.syncMode()); + tx.nearFinishFutureId(req.futureId(), req.miniId()); if (req.commit()) { tx.storeEnabled(req.storeEnabled()); @@ -864,9 +863,6 @@ public class IgniteTxHandler { return null; } - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); // Only for error logging. @@ -875,9 +871,6 @@ public class IgniteTxHandler { return commitFut; } else { - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); // Only for error logging.
