http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 90a68ad..56a7fa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -76,13 +76,11 @@ import org.apache.ignite.lang.IgniteFutureCancelledException; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; -import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE; import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -114,7 +112,7 @@ public class IgniteTxHandler { * @param req Request. * @return Prepare future. */ - public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { + private IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + ", node=" + nearNodeId + ']'); @@ -272,6 +270,7 @@ public class IgniteTxHandler { U.error(log, "Failed to prepare DHT transaction: " + locTx, e); return new GridNearTxPrepareResponse( + req.partition(), req.version(), req.futureId(), req.miniId(), @@ -287,6 +286,27 @@ public class IgniteTxHandler { } /** + * @param entries Entries. + * @return First entry. + * @throws IgniteCheckedException If failed. + */ + private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException { + if (entries == null) + return null; + + IgniteTxEntry firstEntry = null; + + for (IgniteTxEntry e : entries) { + e.unmarshal(ctx, false, ctx.deploy().globalLoader()); + + if (firstEntry == null) + firstEntry = e; + } + + return firstEntry; + } + + /** * Prepares near transaction. * * @param nearNodeId Near node ID that initiated transaction. @@ -309,15 +329,13 @@ public class IgniteTxHandler { return null; } - IgniteTxEntry firstEntry = null; + IgniteTxEntry firstEntry; try { - for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { - e.unmarshal(ctx, false, ctx.deploy().globalLoader()); + IgniteTxEntry firstWrite = unmarshal(req.writes()); + IgniteTxEntry firstRead = unmarshal(req.reads()); - if (firstEntry == null) - firstEntry = e; - } + firstEntry = firstWrite != null ? firstWrite : firstRead; } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -364,6 +382,7 @@ public class IgniteTxHandler { } GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.partition(), req.version(), req.futureId(), req.miniId(), @@ -449,17 +468,16 @@ public class IgniteTxHandler { tx.transactionNodes(req.transactionNodes()); - // Set near on originating node flag only if the sender node has new version. - if (req.near() && FINISH_NEAR_ONE_PHASE_SINCE.compareTo(nearNode.version()) <= 0) + if (req.near()) tx.nearOnOriginatingNode(true); if (req.onePhaseCommit()) { - assert req.last(); + assert req.last() : req; tx.onePhaseCommit(true); } - if (req.returnValue()) + if (req.needReturnValue()) tx.needReturnValue(true); IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync( @@ -778,8 +796,13 @@ public class IgniteTxHandler { ", commit=" + req.commit() + ']'); // Always send finish response. - GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(), - req.miniId(), new IgniteCheckedException("Transaction has been already completed.")); + GridCacheMessage res = new GridNearTxFinishResponse( + req.partition(), + req.version(), + req.threadId(), + req.futureId(), + req.miniId(), + new IgniteCheckedException("Transaction has been already completed.")); try { ctx.io().send(nodeId, res, req.policy()); @@ -819,14 +842,9 @@ public class IgniteTxHandler { try { assert tx != null : "Transaction is null for near finish request [nodeId=" + nodeId + ", req=" + req + "]"; + assert req.syncMode() != null : req; - if (req.syncMode() == null) { - boolean sync = req.commit() ? req.syncCommit() : req.syncRollback(); - - tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC); - } - else - tx.syncMode(req.syncMode()); + tx.syncMode(req.syncMode()); if (req.commit()) { tx.storeEnabled(req.storeEnabled()); @@ -920,7 +938,7 @@ public class IgniteTxHandler { * @param nodeId Sender node ID. * @param req Request. */ - protected final void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) { + private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() + ", dhtTxId=" + req.version() + @@ -938,7 +956,12 @@ public class IgniteTxHandler { GridDhtTxPrepareResponse res; try { - res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), req.deployInfo() != null); + res = new GridDhtTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + req.deployInfo() != null); // Start near transaction first. nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null; @@ -990,7 +1013,12 @@ public class IgniteTxHandler { if (nearTx != null) nearTx.rollback(); - res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId(), e, + res = new GridDhtTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + e, req.deployInfo() != null); } @@ -1041,7 +1069,7 @@ public class IgniteTxHandler { * @param nodeId Node ID. * @param req Request. */ - protected final void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, + private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, final GridDhtTxOnePhaseCommitAckRequest req) { assert nodeId != null; assert req != null; @@ -1058,14 +1086,14 @@ public class IgniteTxHandler { * @param req Request. */ @SuppressWarnings({"unchecked"}) - protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) { + private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) { assert nodeId != null; assert req != null; if (req.checkCommitted()) { boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version()); - if (!committed || !req.syncCommit()) + if (!committed || req.syncMode() != FULL_SYNC) sendReply(nodeId, req, committed, null); else { IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version()); @@ -1301,9 +1329,13 @@ public class IgniteTxHandler { * @param committed {@code True} if transaction committed on this node. * @param nearTxId Near tx version. */ - protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { + protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { if (req.replyRequired() || req.checkCommitted()) { - GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); + GridDhtTxFinishResponse res = new GridDhtTxFinishResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId()); if (req.checkCommitted()) { res.checkCommitted(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index bd806aa..b1a4003 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -37,6 +37,7 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -3277,7 +3278,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success()); } catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + if (!(e instanceof NodeStoppingException)) + rollbackAsync(); throw e; }
