ignite-4768
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d10806f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d10806f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d10806f Branch: refs/heads/ignite-4768-1 Commit: 8d10806f40c24cd77ba72430811ff1464be7a01e Parents: de2697d Author: sboikov <[email protected]> Authored: Mon Mar 13 14:50:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 13 14:50:23 2017 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 3 +- .../processors/cache/GridCacheIoManager.java | 4 +- .../GridDistributedTxFinishRequest.java | 13 ++ .../distributed/dht/GridDhtTxFinishFuture.java | 83 +++++---- .../distributed/dht/GridDhtTxFinishRequest.java | 4 + .../distributed/dht/GridDhtTxPrepareFuture.java | 14 +- .../dht/GridDhtTxPrepareRequest.java | 59 +------ .../near/GridNearTxFinishFuture.java | 12 +- .../near/GridNearTxFinishRequest.java | 2 + .../cache/transactions/IgniteTxHandler.java | 174 +++++++++++-------- 10 files changed, 193 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 5a26187..85e2cbe 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -170,7 +171,7 @@ public class MessageCodeGenerator { // gen.generateAndWrite(GridNearAtomicUpdateRequest.class); -// gen.generateAndWrite(GridMessageCollection.class); + gen.generateAndWrite(GridDhtTxPrepareRequest.class); // gen.generateAndWrite(DataStreamerEntry.class); // gen.generateAndWrite(GridDistributedLockRequest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 71f4e1c..e91bc9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -367,8 +367,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { unmarshall(nodeId, cacheMsg); -// if (!cacheMsg.partitionExchangeMessage()) -// log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']'); + if (!cacheMsg.partitionExchangeMessage()) + log.info("Message [from=" + nodeId + ", msg=" + cacheMsg + ']'); if (cacheMsg.classError() != null) processFailedMessage(nodeId, cacheMsg, c); http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index ab9f0ff..c794f96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -63,6 +63,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** */ protected static final int STORE_ENABLED_FLAG_MASK = 0x20; + /** */ + private static final int DHT_REPLY_NEAR_FLAG_MASK = 0x40; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -138,6 +141,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i @NotNull AffinityTopologyVersion topVer, @Nullable GridCacheVersion commitVer, long threadId, + boolean dhtReplyNear, boolean commit, boolean invalidate, boolean sys, @@ -171,6 +175,15 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i this.txSize = txSize; completedVersions(committedVers, rolledbackVers); + + setFlag(dhtReplyNear, DHT_REPLY_NEAR_FLAG_MASK); + } + + /** + * @return {@code True} if transaction works in mode when DHT nodes reply directly to near node. + */ + public final boolean dhtReplyNear() { + return isFlag(DHT_REPLY_NEAR_FLAG_MASK); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 17e9047..72a9b73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -236,7 +236,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (finishErr == null) finishErr = this.tx.commitError(); - if (this.tx.syncMode() != PRIMARY_SYNC) + if (this.tx.syncMode() != PRIMARY_SYNC && !this.tx.dhtReplyNear()) this.tx.sendFinishReply(finishErr); // Don't forget to clean up. @@ -322,7 +322,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.commitVersion(), tx.threadId(), tx.isolation(), - false, + /*dhtReplyNear*/false, // TODO IGNITE-4768. + /*commit*/false, tx.isInvalidate(), tx.system(), tx.ioPolicy(), @@ -387,6 +388,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur boolean sync = tx.syncMode() == FULL_SYNC; + boolean dhtReplyNear = tx.dhtReplyNear() && tx.syncMode() == FULL_SYNC; + if (tx.explicitLock()) sync = true; @@ -406,9 +409,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Nothing to send. continue; - MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping); + MiniFuture fut = null; - add(fut); // Append new future. + if (!dhtReplyNear) { + fut = new MiniFuture(++miniId, dhtMapping, nearMapping); + + add(fut); // Append new future. + } Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size()); @@ -417,13 +424,14 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), - futId, - fut.futureId(), + dhtReplyNear ? tx.nearFutureId() : futId, + dhtReplyNear ? 0 : fut.futureId(), tx.topologyVersion(), tx.xidVersion(), tx.commitVersion(), tx.threadId(), tx.isolation(), + dhtReplyNear, commit, tx.isInvalidate(), tx.system(), @@ -455,22 +463,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (sync) res = true; - else + else if (fut != null) fut.onDone(); } catch (IgniteCheckedException e) { - // Fail the whole thing. - if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft((ClusterTopologyCheckedException)e); - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); - } + if (fut != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onNodeLeft((ClusterTopologyCheckedException)e); + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } - fut.onResult(e); + // TODO IGNITE-4768: reply on near with error? + fut.onResult(e); + } } } } @@ -481,19 +491,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Nothing to send. continue; - MiniFuture fut = new MiniFuture(++miniId, null, nearMapping); + MiniFuture fut = null; - add(fut); // Append new future. + if (!dhtReplyNear) { + fut = new MiniFuture(++miniId, null, nearMapping); + + add(fut); // Append new future. + } GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), futId, - fut.futureId(), + fut != null ? fut.futureId() : -1, tx.topologyVersion(), tx.xidVersion(), tx.commitVersion(), tx.threadId(), tx.isolation(), + dhtReplyNear, commit, tx.isInvalidate(), tx.system(), @@ -524,22 +539,24 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (sync) res = true; - else + else if (fut != null) fut.onDone(); } catch (IgniteCheckedException e) { - // Fail the whole thing. - if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft((ClusterTopologyCheckedException)e); - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + - ", err=" + e + ']'); + if (fut != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onNodeLeft((ClusterTopologyCheckedException)e); + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + + ", err=" + e + ']'); + } + + // TODO IGNITE-4768: reply on near with error? + fut.onResult(e); } - - fut.onResult(e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index d9b3ae7..40f96c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -106,6 +106,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { GridCacheVersion commitVer, long threadId, TransactionIsolation isolation, + boolean dhtReplyNear, boolean commit, boolean invalidate, boolean sys, @@ -129,6 +130,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { topVer, commitVer, threadId, + dhtReplyNear, commit, invalidate, sys, @@ -190,6 +192,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { GridCacheVersion commitVer, long threadId, TransactionIsolation isolation, + boolean dhtReplyNear, boolean commit, boolean invalidate, boolean sys, @@ -216,6 +219,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { commitVer, threadId, isolation, + dhtReplyNear, commit, invalidate, sys, http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ca028f8..c8e06af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1276,7 +1276,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter MiniFuture fut = null; - if (!tx.dhtReplyNear()) { + if (!dhtReplyNear) { fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); add(fut); // Append new future. @@ -1285,10 +1285,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert txNodes != null; GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut != null ? fut.futureId() : 0, - nearFutId, - nearMiniId, + dhtReplyNear ? nearFutId : futId, + dhtReplyNear ? nearMiniId : fut.futureId(), tx.topologyVersion(), tx, timeout, @@ -1405,10 +1403,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter add(fut); // Append new future. GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - nearFutId, - nearMiniId, + dhtReplyNear ? nearFutId : futId, + dhtReplyNear ? nearMiniId : fut.futureId(), tx.topologyVersion(), tx, timeout, http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 85a65a8..04a296d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -61,12 +61,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Mini future ID. */ private int miniId; - /** Future ID. */ - private IgniteUuid nearFutId; - - /** Mini future ID. */ - private int nearMiniId; - /** Topology version. */ private AffinityTopologyVersion topVer; @@ -113,8 +107,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** * @param futId Future ID. * @param miniId Mini future ID. - * @param nearFutId Near node future ID. - * @param nearMiniId Near node mini future ID. * @param topVer Topology version. * @param tx Transaction. * @param timeout Transaction timeout. @@ -129,8 +121,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { public GridDhtTxPrepareRequest( IgniteUuid futId, int miniId, - IgniteUuid nearFutId, - int nearMiniId, AffinityTopologyVersion topVer, GridDhtTxLocalAdapter tx, long timeout, @@ -156,8 +146,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { onePhaseCommit, addDepInfo); - assert dhtNearReply || (futId != null && miniId != 0); - assert !dhtNearReply || (nearFutId != null && nearMiniId != 0); + assert futId != null; + assert miniId != 0; this.topVer = topVer; this.futId = futId; @@ -166,8 +156,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { this.nearXidVer = nearXidVer; this.subjId = subjId; this.taskNameHash = taskNameHash; - this.nearFutId = nearFutId; - this.nearMiniId = nearMiniId; needReturnValue(retVal); @@ -264,21 +252,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } /** - * @return Near future ID. - */ - public IgniteUuid nearFutureId() { - return nearFutId; - } - - /** - * @return Near mini future ID. - */ - public int nearMiniId() { - return nearMiniId; - } - - - /** * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { @@ -395,18 +368,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); - case 23: - if (!writer.writeIgniteUuid("nearFutId", nearFutId)) - return false; - - writer.incrementState(); - - case 24: - if (!writer.writeInt("nearMiniId", nearMiniId)) - return false; - - writer.incrementState(); - case 25: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; @@ -501,22 +462,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 23: - nearFutId = reader.readIgniteUuid("nearFutId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 24: - nearMiniId = reader.readInt("nearMiniId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 25: nearNodeId = reader.readUuid("nearNodeId"); http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index c55d515..44455ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -716,10 +716,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu // Version to be added in completed versions on primary node. GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion() : null; + boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC; + GridNearTxFinishRequest req = new GridNearTxFinishRequest( futId, tx.xidVersion(), tx.threadId(), + dhtReplyNear, commit, tx.isInvalidate(), tx.system(), @@ -737,8 +740,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.activeCachesDeploymentEnabled() ); - boolean dhtReplyNear = tx.dhtReplyNear() && syncMode == FULL_SYNC; - // If this is the primary node for the keys. if (n.isLocal()) { req.miniId(miniId); @@ -855,11 +856,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.commitVersion(), tx.threadId(), tx.isolation(), - true, - false, + /*dhtReplyNear*/false, + /*commit*/true, + /*invalidate*/false, tx.system(), tx.ioPolicy(), - false, + /*sysInvalidate*/false, tx.syncMode(), null, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 05c1f3e..58e75f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -73,6 +73,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { IgniteUuid futId, GridCacheVersion xidVer, long threadId, + boolean dhtReplyNear, boolean commit, boolean invalidate, boolean sys, @@ -94,6 +95,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { topVer, null, threadId, + dhtReplyNear, commit, invalidate, sys, http://git-wip-us.apache.org/repos/asf/ignite/blob/8d10806f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 754979c..71b847a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -967,8 +969,8 @@ public class IgniteTxHandler { nearRes = new GridDhtTxPrepareResponse( req.partition(), req.nearXidVersion(), - req.nearFutureId(), - req.nearMiniId(), + req.futureId(), + req.miniId(), req.deployInfo() != null); nearRes.nearNodeResponse(true); @@ -1036,6 +1038,7 @@ public class IgniteTxHandler { if (nearTx != null) nearTx.rollback(); + // TODO IGNITE-4768. res = new GridDhtTxPrepareResponse( req.partition(), req.version(), @@ -1116,19 +1119,19 @@ public class IgniteTxHandler { @SuppressWarnings({"unchecked"}) private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) { assert nodeId != null; - assert req != null; + assert req.nearNodeId() != null : req; if (req.checkCommitted()) { boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version()); if (!committed || req.syncMode() != FULL_SYNC) - sendReply(nodeId, req, committed, null); + sendCheckCommittedReply(nodeId, req, committed); else { IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version()); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - sendReply(nodeId, req, true, null); + sendCheckCommittedReply(nodeId, req, true); } }); } @@ -1181,16 +1184,16 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { - sendReply(nodeId, req, true, nearTxId); + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + sendFinishReply(nodeId, req, nearTxId); } }); } else - sendReply(nodeId, req, true, nearTxId); + sendFinishReply(nodeId, req, nearTxId); } else - sendReply(nodeId, req, true, null); + sendFinishReply(nodeId, req, null); assert req.txState() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) : req; } @@ -1350,81 +1353,75 @@ public class IgniteTxHandler { } /** - * Sends tx finish response to remote node, if response is requested. - * * @param nodeId Node id that originated finish request. * @param req Request. * @param committed {@code True} if transaction committed on this node. - * @param nearTxId Near tx version. */ - private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { - if (req.replyRequired() || req.checkCommitted()) { - GridDhtTxFinishResponse res = new GridDhtTxFinishResponse( - req.partition(), - req.version(), - req.futureId(), - req.miniId()); + private void sendCheckCommittedReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) { + assert req.checkCommitted() : req; - if (req.checkCommitted()) { - res.checkCommitted(true); - - if (committed) { - if (req.needReturnValue()) { - try { - GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version()); - - if (wrapper != null) - res.returnValue(wrapper.fut().get()); - else - assert !ctx.discovery().alive(nodeId) : nodeId; - } - catch (IgniteCheckedException ignored) { - if (txFinishMsgLog.isDebugEnabled()) { - txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId + - ", dhtTxId=" + req.version() + - ", node=" + nodeId + ']'); - } - } - } - } - else { - ClusterTopologyCheckedException cause = - new ClusterTopologyCheckedException("Primary node left grid."); + GridDhtTxFinishResponse res = new GridDhtTxFinishResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId()); - res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " + - "(transaction has been rolled back on backup node): " + req.version(), cause)); - } - } + res.checkCommitted(true); - try { - ctx.io().send(nodeId, res, req.policy()); + if (committed) { + if (req.needReturnValue()) { + try { + GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version()); - if (txFinishMsgLog.isDebugEnabled()) { - txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId + - ", dhtTxId=" + req.version() + - ", node=" + nodeId + - ", checkCommitted=" + req.checkCommitted() + ']'); + if (wrapper != null) + res.returnValue(wrapper.fut().get()); + else + assert !ctx.discovery().alive(nodeId) : nodeId; } - } - catch (Throwable e) { - // Double-check. - if (ctx.discovery().node(nodeId) == null) { + catch (IgniteCheckedException ignored) { if (txFinishMsgLog.isDebugEnabled()) { - txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId + + txFinishMsgLog.debug("Failed to get return value. [txId=null" + ", dhtTxId=" + req.version() + ", node=" + nodeId + ']'); } } - else { - U.error(log, "Failed to send finish response to node [txId=" + nearTxId + - ", dhtTxId=" + req.version() + - ", nodeId=" + nodeId + - ", res=" + res + ']', e); - } + } + } + else { + ClusterTopologyCheckedException cause = + new ClusterTopologyCheckedException("Primary node left grid."); - if (e instanceof Error) - throw (Error)e; + res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " + + "(transaction has been rolled back on backup node): " + req.version(), cause)); + } + + sendFinishResponse(nodeId, res, req, null); + } + + /** + * Sends tx finish response to remote node, if response is requested. + * + * @param nodeId Node id that originated finish request. + * @param req Request. + * @param nearTxId Near tx version. + */ + private void sendFinishReply(UUID nodeId, GridDhtTxFinishRequest req, GridCacheVersion nearTxId) { + assert !req.checkCommitted(); + + if (req.replyRequired()) { + GridDhtTxFinishResponse res = new GridDhtTxFinishResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId()); + + if (req.dhtReplyNear()) { + nodeId = req.nearNodeId(); + + res.nearNodeResponse(true); } + + sendFinishResponse(nodeId, res, req, nearTxId); } else { if (txFinishMsgLog.isDebugEnabled()) { @@ -1437,6 +1434,47 @@ public class IgniteTxHandler { /** * @param nodeId Node ID. + * @param res Response. + * @param req Request (for debug info logging). + * @param nearTxId (for debug info logging). + */ + private void sendFinishResponse(UUID nodeId, + GridDistributedTxFinishResponse res, + GridDhtTxFinishRequest req, + @Nullable GridCacheVersion nearTxId) { + try { + ctx.io().send(nodeId, res, req.policy()); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", checkCommitted=" + req.checkCommitted() + ']'); + } + } + catch (Throwable e) { + // Double-check. + if (ctx.discovery().node(nodeId) == null) { + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.error(log, "Failed to send finish response to node [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", nodeId=" + nodeId + + ", res=" + res + ']', e); + } + + if (e instanceof Error) + throw (Error)e; + } + } + + /** + * @param nodeId Node ID. * @param req Request. * @param res Response or {@code null} if should not reply to primary. * @return Remote transaction.
