Repository: ignite Updated Branches: refs/heads/ignite-3212 bbb15cc83 -> 17999d6f3
ignite-3212 Fixed issue with message send failure and late discovery event. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17999d6f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17999d6f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17999d6f Branch: refs/heads/ignite-3212 Commit: 17999d6f3e58c1227562b06f3fb4561d2417b56b Parents: bbb15cc Author: sboikov <[email protected]> Authored: Thu Jun 2 13:26:43 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 2 13:26:43 2016 +0300 ---------------------------------------------------------------------- .../distributed/GridCacheTxRecoveryFuture.java | 36 ++++++++++++-------- .../cache/transactions/IgniteTxManager.java | 9 ++--- 2 files changed, 26 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/17999d6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 29e4c5b..7525114 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -69,8 +71,8 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea /** All involved nodes. */ private final Map<UUID, ClusterNode> nodes; - /** ID of failed node started transaction. */ - private final UUID failedNodeId; + /** ID of failed nodes started transaction. */ + private final Set<UUID> failedNodeIds; /** Transaction nodes mapping. */ private final Map<UUID, Collection<UUID>> txNodes; @@ -81,13 +83,13 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea /** * @param cctx Context. * @param tx Transaction. - * @param failedNodeId ID of failed node started transaction. + * @param failedNodeIds ID of failed nodes started transaction. * @param txNodes Transaction mapping. */ @SuppressWarnings("ConstantConditions") public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx, IgniteInternalTx tx, - UUID failedNodeId, + Set<UUID> failedNodeIds, Map<UUID, Collection<UUID>> txNodes) { super(CU.boolReducer()); @@ -95,7 +97,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea this.cctx = cctx; this.tx = tx; this.txNodes = txNodes; - this.failedNodeId = failedNodeId; + this.failedNodeIds = failedNodeIds; if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class); @@ -105,7 +107,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea UUID locNodeId = cctx.localNodeId(); for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) { - if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) { + if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey()) && !nodes.containsKey(e.getKey())) { ClusterNode node = cctx.discovery().node(e.getKey()); if (node != null) @@ -115,7 +117,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea } for (UUID nodeId : e.getValue()) { - if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) { + if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId) && !nodes.containsKey(nodeId)) { ClusterNode node = cctx.discovery().node(nodeId); if (node != null) @@ -128,7 +130,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea UUID nearNodeId = tx.eventNodeId(); - nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId); + nearTxCheck = !failedNodeIds.contains(nearNodeId) && cctx.discovery().alive(nearNodeId); } /** @@ -170,7 +172,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea cctx.io().send(nearNodeId, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException ignore) { - fut.onNodeLeft(); + fut.onNodeLeft(nearNodeId); } catch (IgniteCheckedException e) { fut.onError(e); @@ -255,7 +257,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea * send message only to primary node. */ - if (nodeId.equals(failedNodeId)) { + if (failedNodeIds.contains(nodeId)) { for (UUID id : entry.getValue()) { // Skip backup node if it is local node or if it is also was mapped as primary. if (txNodes.containsKey(id) || id.equals(cctx.localNodeId())) @@ -276,7 +278,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea cctx.io().send(id, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + fut.onNodeLeft(id); } catch (IgniteCheckedException e) { fut.onError(e); @@ -302,7 +304,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea cctx.io().send(nodeId, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + fut.onNodeLeft(nodeId); } catch (IgniteCheckedException e) { fut.onError(e); @@ -401,7 +403,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea MiniFuture f = (MiniFuture)fut; if (f.nodeId().equals(nodeId)) - f.onNodeLeft(); + f.onNodeLeft(nodeId); } return true; @@ -514,14 +516,18 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea } /** + * @param nodeId Failed node ID. */ - private void onNodeLeft() { + private void onNodeLeft(UUID nodeId) { if (log.isDebugEnabled()) log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); if (nearTxCheck) { + Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds); + failedNodeIds0.add(nodeId); + // Near and originating nodes left, need initiate tx check. - cctx.tm().commitIfPrepared(tx); + cctx.tm().commitIfPrepared(tx, failedNodeIds0); onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore).")); } http://git-wip-us.apache.org/repos/asf/ignite/blob/17999d6f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 789ef8d..4ec280f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1872,8 +1872,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * transactions were prepared (invalidates transaction if it is not fully prepared). * * @param tx Transaction. + * @param failedNodeIds Failed nodes IDs. */ - public void commitIfPrepared(IgniteInternalTx tx) { + public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) { assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; assert !F.isEmpty(tx.transactionNodes()) : tx; assert tx.nearXidVersion() != null : tx; @@ -1881,7 +1882,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture( cctx, tx, - tx.originatingNodeId(), + failedNodeIds, tx.transactionNodes()); cctx.mvcc().addFuture(fut, fut.futureId()); @@ -2147,7 +2148,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. if (tx.originatingNodeId().equals(evtNodeId)) { if (tx.state() == PREPARED) - commitIfPrepared(tx); + commitIfPrepared(tx, Collections.singleton(evtNodeId)); else { IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); @@ -2155,7 +2156,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { prepFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { if (tx.state() == PREPARED) - commitIfPrepared(tx); + commitIfPrepared(tx, Collections.singleton(evtNodeId)); else if (tx.setRollbackOnly()) tx.rollbackAsync(); }
