http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 291c88a..1b40d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -44,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -64,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0"); /** */ + public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0"); + + /** */ private static final long serialVersionUID = 0L; /** Logger reference. */ @@ -122,22 +129,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MinFuture f = (MinFuture)fut; - if (f.node().id().equals(nodeId)) { + if (f.onNodeLeft(nodeId)) { // Remove previous mapping. mappings.remove(nodeId); - f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId)); - - return true; + found = true; } } - return false; + return found; } /** {@inheritDoc} */ @@ -156,19 +164,32 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param nodeId Sender. * @param res Result. */ + @SuppressWarnings("ForLoopReplaceableByForEach") public void onResult(UUID nodeId, GridNearTxFinishResponse res) { - if (!isDone()) - for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + if (!isDone()) { + FinishMiniFuture finishFut = null; - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); + synchronized (futs) { + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i); + + if (fut.getClass() == FinishMiniFuture.class) { + FinishMiniFuture f = (FinishMiniFuture)fut; - f.onResult(res); + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + finishFut = f; + + break; + } } } } + + if (finishFut != null) + finishFut.onNearFinishResponse(res); + } } /** @@ -178,15 +199,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu public void onResult(UUID nodeId, GridDhtTxFinishResponse res) { if (!isDone()) for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + if (fut.getClass() == CheckBackupMiniFuture.class) { + CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut; if (f.futureId().equals(res.miniId())) { assert f.node().id().equals(nodeId); - f.onResult(res); + f.onDhtFinishResponse(res); } } + else if (fut.getClass() == CheckRemoteTxMiniFuture.class) { + CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut; + + if (f.futureId().equals(res.miniId())) + f.onDhtFinishResponse(nodeId); + } } } @@ -204,9 +231,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu boolean marked = tx.setRollbackOnly(); - if (err instanceof NodeStoppingException) - return super.onDone(null, err); - if (err instanceof IgniteTxRollbackCheckedException) { if (marked) { try { @@ -289,11 +313,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * @param f Future. + * @param fut Future. * @return {@code True} if mini-future. */ - private boolean isMini(IgniteInternalFuture<?> f) { - return f.getClass().equals(MiniFuture.class); + private boolean isMini(IgniteInternalFuture<?> fut) { + return fut.getClass() == FinishMiniFuture.class || + fut.getClass() == CheckBackupMiniFuture.class || + fut.getClass() == CheckRemoteTxMiniFuture.class; } /** @@ -393,7 +419,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu ClusterNode backup = cctx.discovery().node(backupId); - MiniFuture mini = new MiniFuture(backup, mapping); + final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping); add(mini); @@ -414,8 +440,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu readyNearMappingFromBackup(mapping); - if (committed) + if (committed) { + if (tx.syncCommit()) { + GridCacheVersion nearXidVer = tx.nearXidVersion(); + + assert nearXidVer != null : tx; + + IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + mini.onDone(tx); + } + }); + + return; + } + mini.onDone(tx); + } else { ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException("Primary node left grid: " + nodeId); @@ -427,46 +470,26 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } else { - GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest( - cctx.localNodeId(), - futureId(), - mini.futureId(), - tx.topologyVersion(), - tx.xidVersion(), - tx.commitVersion(), - tx.threadId(), - tx.isolation(), - true, - false, - tx.system(), - tx.ioPolicy(), - false, - true, - true, - null, - null, - null, - null, - 0, - null, - 0, - tx.activeCachesDeploymentEnabled()); - - finishReq.checkCommitted(true); + GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId()); + + // Preserve old behavior, otherwise response is not sent. + if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0) + finishReq.syncCommit(true); try { if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) cctx.io().send(backup, finishReq, tx.ioPolicy()); - else + else { mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " + "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() + ", ver=" + backup.version() + ']')); + } } catch (ClusterTopologyCheckedException e) { - mini.onResult(e); + mini.onNodeLeft(backupId); } catch (IgniteCheckedException e) { - mini.onResult(e); + mini.onDone(e); } } } @@ -476,7 +499,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * + * @return {@code True} if need to send finish request for one phase commit transaction. */ private boolean needFinishOnePhase() { if (tx.mappings().empty()) @@ -584,7 +607,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu add(fut); } else { - MiniFuture fut = new MiniFuture(m); + FinishMiniFuture fut = new FinishMiniFuture(m); req.miniId(fut.futureId()); @@ -604,11 +627,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu // Remove previous mapping. mappings.remove(m.node().id()); - fut.onResult(e); + fut.onNodeLeft(n.id()); } catch (IgniteCheckedException e) { // Fail the whole thing. - fut.onResult(e); + fut.onDone(e); } } } @@ -618,10 +641,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @SuppressWarnings("unchecked") @Override public String apply(IgniteInternalFuture<?> f) { - if (isMini(f)) { - MiniFuture m = (MiniFuture)f; + if (f.getClass() == FinishMiniFuture.class) { + FinishMiniFuture fut = (FinishMiniFuture)f; + + return "FinishFuture[node=" + fut.node().id() + + ", loc=" + fut.node().isLocal() + + ", done=" + fut.isDone() + "]"; + } + else if (f.getClass() == CheckBackupMiniFuture.class) { + CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f; - return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; + return "CheckBackupFuture[node=" + fut.node().id() + + ", loc=" + fut.node().isLocal() + + ", done=" + f.isDone() + "]"; + } + else if (f.getClass() == CheckRemoteTxMiniFuture.class) { + CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f; + + return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]"; } else return "[loc=true, done=" + f.isDone() + "]"; @@ -634,108 +671,217 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** + * @param miniId Mini future ID. + * @return Finish request. + */ + private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) { + GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest( + cctx.localNodeId(), + futureId(), + miniId, + tx.topologyVersion(), + tx.xidVersion(), + tx.commitVersion(), + tx.threadId(), + tx.isolation(), + true, + false, + tx.system(), + tx.ioPolicy(), + false, + tx.syncCommit(), + tx.syncRollback(), + null, + null, + null, + null, + 0, + null, + 0, + tx.activeCachesDeploymentEnabled()); + + finishReq.checkCommitted(true); + + return finishReq; + } + + /** + * + */ + private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** + * @param nodeId Node ID. + * @return {@code True} if future processed node failure. + */ + abstract boolean onNodeLeft(UUID nodeId); + + /** + * @return Future ID. + */ + final IgniteUuid futureId() { + return futId; + } + } + + /** * Mini-future for get operations. Mini-futures are only waiting on a single * node as opposed to multiple nodes. */ - private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> { + private class FinishMiniFuture extends MinFuture { /** */ private static final long serialVersionUID = 0L; - /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); - /** Keys. */ @GridToStringInclude private GridDistributedTxMapping m; - /** Backup check flag. */ - private ClusterNode backup; - /** * @param m Mapping. */ - MiniFuture(GridDistributedTxMapping m) { + FinishMiniFuture(GridDistributedTxMapping m) { this.m = m; } /** - * @param backup Backup to check. - * @param m Mapping associated with the backup. + * @return Node ID. */ - MiniFuture(ClusterNode backup, GridDistributedTxMapping m) { - this.backup = backup; - this.m = m; + ClusterNode node() { + return m.node(); } /** - * @return Future ID. + * @return Keys. */ - IgniteUuid futureId() { - return futId; + public GridDistributedTxMapping mapping() { + return m; } /** - * @return Node ID. + * @param nodeId Failed node ID. */ - public ClusterNode node() { - assert m != null || backup != null; + boolean onNodeLeft(UUID nodeId) { + if (nodeId.equals(m.node().id())) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply: " + this); + + if (isSync()) { + Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes(); + + if (txNodes != null) { + Collection<UUID> backups = txNodes.get(nodeId); + + if (!F.isEmpty(backups)) { + final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups)); + + add(mini); + + GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId()); + + req.waitRemoteTransactions(true); + + for (UUID backupId : backups) { + ClusterNode backup = cctx.discovery().node(backupId); + + if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) { + if (backup.isLocal()) { + IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion()); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + mini.onDhtFinishResponse(cctx.localNodeId()); + } + }); + } + else { + try { + cctx.io().send(backup, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + mini.onNodeLeft(backupId); + } + catch (IgniteCheckedException e) { + mini.onDone(e); + } + } + } + else + mini.onDhtFinishResponse(backupId); + } + } + } + } - return backup != null ? backup : m.node(); + onDone(tx); + + return true; + } + + return false; } /** - * @return Keys. + * @param res Result callback. */ - public GridDistributedTxMapping mapping() { - return m; + void onNearFinishResponse(GridNearTxFinishResponse res) { + if (res.error() != null) + onDone(res.error()); + else + onDone(tx); } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } + + /** + * + */ + private class CheckBackupMiniFuture extends MinFuture { + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Backup node to check. */ + private ClusterNode backup; + /** - * @param e Error. + * @param backup Backup to check. + * @param m Mapping associated with the backup. */ - void onResult(Throwable e) { - if (log.isDebugEnabled()) - log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); - - // Fail. - onDone(e); + CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) { + this.backup = backup; + this.m = m; } /** - * @param e Node failure. + * @return Node ID. */ - void onResult(ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); + public ClusterNode node() { + return backup; + } - if (backup != null) { + /** {@inheritDoc} */ + @Override boolean onNodeLeft(UUID nodeId) { + if (nodeId.equals(backup.id())) { readyNearMappingFromBackup(m); - onDone(e); - } - else - // Complete future with tx. - onDone(tx); - } + onDone(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); - /** - * @param res Result callback. - */ - void onResult(GridNearTxFinishResponse res) { - assert backup == null; + return true; + } - if (res.error() != null) - onDone(res.error()); - else - onDone(tx); + return false; } /** * @param res Response. */ - void onResult(GridDhtTxFinishResponse res) { - assert backup != null; - + void onDhtFinishResponse(GridDhtTxFinishResponse res) { readyNearMappingFromBackup(m); Throwable err = res.checkCommittedError(); @@ -755,9 +901,67 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu onDone(tx); } + } + + /** + * + */ + private class CheckRemoteTxMiniFuture extends MinFuture { + /** */ + private Set<UUID> nodes; + + /** + * @param nodes Backup nodes. + */ + public CheckRemoteTxMiniFuture(Set<UUID> nodes) { + this.nodes = nodes; + } + + /** + * @return Backup nodes. + */ + Set<UUID> nodes() { + synchronized (this) { + return new HashSet<>(nodes); + } + } + + /** {@inheritDoc} */ + @Override boolean onNodeLeft(UUID nodeId) { + return onResponse(nodeId); + } + + /** + * @param nodeId Node ID. + */ + void onDhtFinishResponse(UUID nodeId) { + onResponse(nodeId); + } + + /** + * @param nodeId Node ID. + * @return {@code True} if processed node response. + */ + private boolean onResponse(UUID nodeId) { + boolean done; + + boolean ret; + + synchronized (this) { + ret = nodes.remove(nodeId); + + done = nodes.isEmpty(); + } + + if (done) + onDone(tx); + + return ret; + } + /** {@inheritDoc} */ @Override public String toString() { - return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + return S.toString(CheckRemoteTxMiniFuture.class, this); } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 3e5e28f..65eac63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -70,6 +70,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { * @param commit Commit flag. * @param invalidate Invalidate flag. * @param sys System flag. + * @param plc IO policy. + * @param syncCommit Sync commit flag. + * @param syncRollback Sync rollback flag. * @param explicitLock Explicit lock flag. * @param storeEnabled Store enabled flag. * @param topVer Topology version. @@ -77,6 +80,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. * @param txSize Expected transaction size. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. * @param addDepInfo Deployment info flag. */ public GridNearTxFinishRequest( http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java index 4904ad8..b84d2fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java @@ -99,7 +99,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); } @@ -107,7 +107,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index aa4e929f..b7b480e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -202,7 +202,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Nullable @Override public GridCacheVersion nearXidVersion() { + @Override public GridCacheVersion nearXidVersion() { return xidVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index d886243..8812709 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -264,7 +264,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (ownedVals != null) { + if (ownedVals != null && ownedValKeys == null) { ownedValKeys = ownedVals.keySet(); ownedValVals = ownedVals.values(); @@ -287,7 +287,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse } if (filterFailedKeys != null) { - for (IgniteTxKey key :filterFailedKeys) { + for (IgniteTxKey key : filterFailedKeys) { GridCacheContext cctx = ctx.cacheContext(key.cacheId()); key.prepareMarshal(cctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 59d8b5b..dc98eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -280,28 +280,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache GridCacheContext cctx = ctx.cacheContext(cacheId); - if (keyValFilter != null) { + if (keyValFilter != null && keyValFilterBytes == null) { if (addDepInfo) prepareObject(keyValFilter, cctx); keyValFilterBytes = CU.marshal(cctx, keyValFilter); } - if (rdc != null) { + if (rdc != null && rdcBytes == null) { if (addDepInfo) prepareObject(rdc, cctx); rdcBytes = CU.marshal(cctx, rdc); } - if (trans != null) { + if (trans != null && transBytes == null) { if (addDepInfo) prepareObject(trans, cctx); transBytes = CU.marshal(cctx, trans); } - if (!F.isEmpty(args)) { + if (!F.isEmpty(args) && argsBytes == null) { if (addDepInfo) { for (Object arg : args) prepareObject(arg, cctx); @@ -317,16 +317,16 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache Marshaller mrsh = ctx.marshaller(); - if (keyValFilterBytes != null) + if (keyValFilterBytes != null && keyValFilter == null) keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr); - if (rdcBytes != null) + if (rdcBytes != null && rdc == null) rdc = mrsh.unmarshal(rdcBytes, ldr); - if (transBytes != null) + if (transBytes != null && trans == null) trans = mrsh.unmarshal(transBytes, ldr); - if (argsBytes != null) + if (argsBytes != null && args == null) args = mrsh.unmarshal(argsBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index cce465b..ab882d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -122,11 +122,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach GridCacheContext cctx = ctx.cacheContext(cacheId); - if (err != null) + if (err != null && errBytes == null) errBytes = ctx.marshaller().marshal(err); - metaDataBytes = marshalCollection(metadata, cctx); - dataBytes = marshalCollection(data, cctx); + if (metaDataBytes == null) + metaDataBytes = marshalCollection(metadata, cctx); + + if (dataBytes == null) + dataBytes = marshalCollection(data, cctx); if (addDepInfo && !F.isEmpty(data)) { for (Object o : data) { @@ -144,11 +147,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = ctx.marshaller().unmarshal(errBytes, ldr); - metadata = unmarshalCollection(metaDataBytes, ctx, ldr); - data = unmarshalCollection(dataBytes, ctx, ldr); + if (metadata == null) + metadata = unmarshalCollection(metaDataBytes, ctx, ldr); + + if (data == null) + data = unmarshalCollection(dataBytes, ctx, ldr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index f5f99f5..914b4ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -160,6 +160,12 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public long timeout(long timeout); /** + * Changes transaction state from COMMITTING to MARKED_ROLLBACK. + * Must be called only from thread committing transaction. + */ + public void errorWhenCommitting(); + + /** * Modify the transaction associated with the current thread such that the * only possible outcome of the transaction is to roll back the * transaction. http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 22e27c3..ed44c49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -804,6 +804,22 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ + public final void errorWhenCommitting() { + synchronized (this) { + TransactionState prev = state; + + assert prev == COMMITTING : prev; + + state = MARKED_ROLLBACK; + + if (log.isDebugEnabled()) + log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); + + notifyAll(); + } + } + + /** {@inheritDoc} */ @Override public boolean setRollbackOnly() { return state(MARKED_ROLLBACK); } @@ -1083,7 +1099,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } case MARKED_ROLLBACK: { - valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING; + valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED; break; } @@ -1705,6 +1721,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ + @Override public void errorWhenCommitting() { + throw new IllegalStateException("Deserialized transaction can only be used as read-only."); + } + + /** {@inheritDoc} */ @Override public void commit() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index c42bc7f..f731975 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -828,7 +828,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { val.marshal(ctx, context()); - expiryPlcBytes = transferExpiryPlc ? CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)) : null; + if (transferExpiryPlc) { + if (expiryPlcBytes == null) + expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); + } + else + expiryPlcBytes = null; } /** @@ -871,8 +876,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { val.unmarshal(this.ctx, clsLdr); - if (expiryPlcBytes != null) - expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr); + if (expiryPlcBytes != null && expiryPlc == null) + expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index b25baf8..547c018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -721,14 +721,12 @@ public class IgniteTxHandler { IgniteInternalFuture<IgniteInternalTx> res = null; - if (tx != null) { - IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); - // Only for error logging. - rollbackFut.listen(CU.errorLogger(log)); + // Only for error logging. + rollbackFut.listen(CU.errorLogger(log)); - res = rollbackFut; - } + res = rollbackFut; if (e instanceof Error) throw (Error)e; @@ -875,7 +873,19 @@ public class IgniteTxHandler { log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); if (req.checkCommitted()) { - sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version())); + boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version()); + + if (!committed || !req.syncCommit()) + sendReply(nodeId, req, committed); + else { + IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version()); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + sendReply(nodeId, req, true); + } + }); + } return; } @@ -1044,7 +1054,7 @@ public class IgniteTxHandler { * @param committed {@code True} if transaction committed on this node. */ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) { - if (req.replyRequired()) { + if (req.replyRequired() || req.checkCommitted()) { GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); if (req.checkCommitted()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 21ff0cf..926eaf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -809,7 +809,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig catch (IgniteCheckedException ex) { commitError(ex); - setRollbackOnly(); + errorWhenCommitting(); // Safe to remove transaction from committed tx list because nothing was committed yet. cctx.tm().removeCommittedTx(this); @@ -819,7 +819,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig catch (Throwable ex) { commitError(ex); - setRollbackOnly(); + errorWhenCommitting(); // Safe to remove transaction from committed tx list because nothing was committed yet. cctx.tm().removeCommittedTx(this); @@ -1161,7 +1161,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig // Set operation to NOOP. txEntry.op(NOOP); - setRollbackOnly(); + errorWhenCommitting(); throw ex; } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d384e4e..ca15e20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1063,6 +1063,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { uncommitTx(tx); + tx.errorWhenCommitting(); + throw new IgniteException("Missing commit version (consider increasing " + IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", tx=" + tx.getClass().getSimpleName() + ']'); @@ -1616,6 +1618,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param nearVer Near version. + * @return Finish future for related remote transactions. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) { + GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>(); + + for (final IgniteInternalTx tx : txs()) { + if (!tx.local() && nearVer.equals(tx.nearXidVersion())) + fut.add((IgniteInternalFuture) tx.finishFuture()); + } + + fut.markInitialized(); + + return fut; + } + + /** * @param nearVer Near version ID. * @param txNum Number of transactions. * @param fut Result future. http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index 3d65304..77c802d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -105,6 +105,7 @@ public class DataStreamerRequest implements Message { * @param entries Entries to put. * @param ignoreDepOwnership Ignore ownership. * @param skipStore Skip store flag. + * @param keepBinary Keep binary flag. * @param depMode Deployment mode. * @param sampleClsName Sample class name. * @param userVer User version. http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index cd783e4..98848ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -926,8 +926,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName); - if (ctx.cache().cache(cacheName) == null) - ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get(); + if (ctx.cache().cache(cacheName) == null) { + ctx.cache().dynamicStartCache(newCfg, + cacheName, + null, + CacheType.INTERNAL, + false, + true, + true).get(); + } assert ctx.cache().cache(cacheName) != null : cacheName; http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java index f4a8fad..ecb892e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java @@ -90,7 +90,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage { @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { super.prepareMarshal(marsh); - if (err != null) + if (err != null && errBytes == null) errBytes = marsh.marshal(err); } @@ -98,7 +98,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage { @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(marsh, ldr); - if (errBytes != null) + if (errBytes != null && err == null) err = marsh.unmarshal(errBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 65dca08..a89913f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -357,7 +357,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { switch (cmd) { case DESTROY_CACHE: { - fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName).chain( + // Do not check thread tx here since there can be active system cache txs. + fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName, false).chain( new CX1<IgniteInternalFuture<?>, GridRestResponse>() { @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { @@ -369,7 +370,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } case GET_OR_CREATE_CACHE: { - fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName).chain( + // Do not check thread tx here since there can be active system cache txs. + fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName, false).chain( new CX1<IgniteInternalFuture<?>, GridRestResponse>() { @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 1ea5014..8c23d92 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -104,6 +104,7 @@ public interface DiscoverySpi extends IgniteSpi { * Sets a handler for initial data exchange between Ignite nodes. * * @param exchange Discovery data exchange handler. + * @return {@code this} for chaining. */ public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange); @@ -113,6 +114,7 @@ public interface DiscoverySpi extends IgniteSpi { * dynamic metrics between nodes. * * @param metricsProvider Provider of metrics data. + * @return {@code this} for chaining. */ public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java index 066a5fd..21204c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@ -204,7 +204,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { * Stops streamer. */ public void stop() { - srv.stop(); + if (srv != null) + srv.stop(); if (log.isDebugEnabled()) log.debug("Socket streaming server stopped"); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java index 95ca9b5..9908b87 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java @@ -324,10 +324,14 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid @Override public void applyx(IgniteCache<String, Integer> cache) { int rnd = random(); + Set<Integer> ids = new HashSet<>(set); + cache.removeAll(rangeKeys(0, rnd)); - for (int i = 0; i < rnd; i++) - assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null; + for (int i = 0; i < rnd; i++) { + if (ids.contains(i)) + assertNull(cache.localPeek("key" + i)); + } } }); } @@ -350,7 +354,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid for (int i = 0; i < rnd; i++) { if (ids.contains(i)) - assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null; + assertNull(cache.localPeek("key" + i)); } } }); @@ -359,6 +363,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid /** * @param cache Cache. * @param key Key. + * @return Removed value. */ private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) { IgniteCache<K, V> cacheAsync = cache.withAsync(); @@ -371,6 +376,8 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid /** * @param cache Cache. * @param key Key. + * @param val Value. + * @return Remove result. */ private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) { IgniteCache<K, V> cacheAsync = cache.withAsync(); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 0d9c541..1e0071e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4337,7 +4337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract log.info("Set iterators not cleared, will wait"); - Thread.sleep(500); + Thread.sleep(1000); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java index a34857f..e70c97b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java @@ -299,7 +299,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest { return null; } - })); + }, "cache-thread")); } readyLatch.await(); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java index 8a5dfd4..c9cd750 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java @@ -158,7 +158,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { ccfg.setName(DYNAMIC_CACHE_NAME); - futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true)); + futs.add(kernal.context().cache().dynamicStartCache(ccfg, + ccfg.getName(), + null, + true, + true, + true)); return null; } @@ -190,7 +195,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { - futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME)); + futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true)); return null; } @@ -218,7 +223,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true)); + futs.add(kernal.context().cache().dynamicStartCache(ccfg, + ccfg.getName(), + null, + true, + true, + true)); return null; } @@ -252,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { @Override public Object call() throws Exception { IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount())); - futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME)); + futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true)); return null; } @@ -315,7 +325,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (int g = 0; g < nodeCount(); g++) caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME); - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); for (int g = 0; g < nodeCount(); g++) { final IgniteKernal kernal0 = (IgniteKernal) grid(g); @@ -368,7 +378,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { } // Undeploy cache. - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); startGrid(nodeCount() + 1); @@ -445,7 +455,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { }, IllegalArgumentException.class, null); } - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); stopGrid(nodeCount() + 1); stopGrid(nodeCount()); @@ -512,7 +522,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (int g = 0; g < nodeCount() + 1; g++) assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1")); - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); } finally { stopGrid(nodeCount()); @@ -554,7 +564,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (int g = 0; g < nodeCount() + 1; g++) assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1")); - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); } finally { stopGrid(nodeCount()); @@ -600,7 +610,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest { for (int g = 0; g < nodeCount() + 1; g++) assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1")); - kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get(); + kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get(); } finally { stopGrid(nodeCount()); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index bf6dcda..34e7080 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -59,6 +60,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index 84838db..a08d080 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -188,6 +188,9 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { } /** + * @param conc Transaction concurrency. + * @param backup Check backup flag. + * @param commit Check commit flag. * @throws Exception If failed. */ private void checkPrimaryNodeFailureBackupCommit( @@ -197,6 +200,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { ) throws Exception { try { startGrids(gridCount()); + awaitPartitionMapExchange(); for (int i = 0; i < gridCount(); i++) @@ -290,7 +294,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { return null; } - }); + }, "tx-thread"); commitLatch.await(); @@ -366,6 +370,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { /** * @param ignite Ignite instance to generate key. + * @param backup Backup key flag. * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for * {@code ignite(1)}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java new file mode 100644 index 0000000..c47401c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheCommitDelayTxRecoveryTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private static volatile boolean commit; + + /** */ + private static volatile CountDownLatch commitStartedLatch; + + /** */ + private static volatile CountDownLatch commitFinishLatch; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testRecovery1() throws Exception { + checkRecovery(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testRecovery2() throws Exception { + checkRecovery(2, false); + } + + /** + * @throws Exception If failed. + */ + public void testRecoveryStoreEnabled1() throws Exception { + checkRecovery(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testRecoveryStoreEnabled2() throws Exception { + checkRecovery(2, true); + } + + /** + * @param backups Number of cache backups. + * @param useStore If {@code true} tests cache with store configured. + * @throws Exception If failed. + */ + private void checkRecovery(int backups, boolean useStore) throws Exception { + startGridsMultiThreaded(SRVS, false); + + client = true; + + Ignite clientNode = startGrid(SRVS); + + assertTrue(clientNode.configuration().isClientMode()); + + client = false; + + clientNode.createCache(cacheConfiguration(backups, useStore)); + + awaitPartitionMapExchange(); + + Ignite srv = ignite(0); + + assertFalse(srv.configuration().isClientMode()); + + for (Boolean pessimistic : Arrays.asList(false, true)) { + checkRecovery(backupKey(srv.cache(null)), srv, pessimistic, useStore); + + checkRecovery(nearKey(srv.cache(null)), srv, pessimistic, useStore); + + checkRecovery(nearKey(clientNode.cache(null)), clientNode, pessimistic, useStore); + + srv = ignite(0); + + assertFalse(srv.configuration().isClientMode()); + } + } + + /** + * @param key Key. + * @param ignite Node executing update. + * @param pessimistic If {@code true} uses pessimistic transaction. + * @param useStore {@code True} if store is used. + * @throws Exception If failed. + */ + private void checkRecovery(final Integer key, + final Ignite ignite, + final boolean pessimistic, + final boolean useStore) throws Exception { + Ignite primary = primaryNode(key, null); + + assertNotSame(ignite, primary); + + List<Ignite> backups = backupNodes(key, null); + + assertFalse(backups.isEmpty()); + + final Set<String> backupNames = new HashSet<>(); + + for (Ignite node : backups) + backupNames.add(node.name()); + + log.info("Check recovery [key=" + key + + ", pessimistic=" + pessimistic + + ", primary=" + primary.name() + + ", backups=" + backupNames + + ", node=" + ignite.name() + ']'); + + final IgniteCache<Integer, Integer> cache = ignite.cache(null); + + cache.put(key, 0); + + commitStartedLatch = new CountDownLatch(backupNames.size()); + commitFinishLatch = new CountDownLatch(1); + + commit = false; + + TestEntryProcessor.skipFirst = useStore ? ignite.name() : null; + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Start update."); + + if (pessimistic) { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.invoke(key, new TestEntryProcessor(backupNames)); + + commit = true; + + log.info("Start commit."); + + assertEquals(backupNames.size(), commitStartedLatch.getCount()); + + tx.commit(); + } + } + else { + commit = true; + + cache.invoke(key, new TestEntryProcessor(backupNames)); + } + + log.info("End update, execute get."); + + Integer val = cache.get(key); + + log.info("Get value: " + val); + + assertEquals(1, (Object)val); + + return null; + } + }, "update-thread"); + + assertTrue(commitStartedLatch.await(30, SECONDS)); + + log.info("Stop node: " + primary.name()); + + primary.close(); + + commitFinishLatch.countDown(); + + fut.get(); + + for (Ignite node : G.allGrids()) + assertEquals(1, node.cache(null).get(key)); + + cache.put(key, 2); + + for (Ignite node : G.allGrids()) + assertEquals(2, node.cache(null).get(key)); + + startGrid(primary.name()); + + for (Ignite node : G.allGrids()) + assertEquals(2, node.cache(null).get(key)); + + cache.put(key, 3); + + for (Ignite node : G.allGrids()) + assertEquals(3, node.cache(null).get(key)); + + awaitPartitionMapExchange(); + } + + /** + * + */ + static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> { + /** */ + private Set<String> nodeNames; + + /** Skips first call for given node (used to skip call for store update). */ + private static String skipFirst; + + /** + * @param nodeNames Node names where sleep will be called. + */ + public TestEntryProcessor(Set<String> nodeNames) { + this.nodeNames = nodeNames; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) { + Ignite ignite = entry.unwrap(Ignite.class); + + System.out.println(Thread.currentThread().getName() + " process [node=" + ignite.name() + + ", commit=" + commit + ", skipFirst=" + skipFirst + ']'); + + boolean skip = false; + + if (commit && ignite.name().equals(skipFirst)) { + skipFirst = null; + + skip = true; + } + + if (!skip && commit && nodeNames.contains(ignite.name())) { + try { + System.out.println(Thread.currentThread().getName() + " start process invoke."); + + assertTrue(commitStartedLatch != null && commitStartedLatch.getCount() > 0); + + commitStartedLatch.countDown(); + + assertTrue(commitFinishLatch.await(10, SECONDS)); + + System.out.println(Thread.currentThread().getName() + " end process invoke."); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + else + System.out.println(Thread.currentThread().getName() + " invoke set value."); + + entry.setValue(1); + + return null; + } + } + + /** + * @param backups Number of backups. + * @param useStore If {@code true} adds cache store. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration(int backups, boolean useStore) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(backups); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + if (useStore) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + + ccfg.setWriteThrough(true); + } + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter<Object, Object>() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 4eb8a6b..7532354 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -30,6 +30,8 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; + +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; @@ -238,7 +240,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst * @param store If {@code true} uses cache with store. * @throws Exception If failed. */ - private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception { + protected final void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception { ignite(0).createCache(cacheConfiguration(memMode, store)); final AtomicBoolean finished = new AtomicBoolean(); @@ -259,7 +261,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst } }); - IgniteCache<Integer, Integer> cache = ignite(0).cache(null); + final IgniteCache<Integer, Integer> cache = ignite(0).cache(null); int iter = 0; @@ -309,6 +311,31 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst break; } + case TX_PUT: { + while (System.currentTimeMillis() < stopTime) { + final Integer val = ++iter; + + Ignite ignite = ignite(0); + + for (int i = 0; i < keysCnt; i++) { + final Integer key = i; + + doInTransaction(ignite, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.put(key, val); + + return null; + } + }); + } + + for (int i = 0; i < keysCnt; i++) + assertEquals(val, cache.get(i)); + } + + break; + } + case PUT_ALL: { while (System.currentTimeMillis() < stopTime) { Integer val = ++iter; @@ -541,7 +568,10 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst INVOKE, /** */ - INVOKE_ALL + INVOKE_ALL, + + /** */ + TX_PUT } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 7655464..9204bc8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -101,6 +101,20 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr /** * @throws Exception If failed. */ + public void testExplicitTransactionRetriesSingleValue() throws Exception { + checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, false); + } + + /** + * @throws Exception If failed. + */ + public void testExplicitTransactionRetriesSingleValueStoreEnabled() throws Exception { + checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, true); + } + + /** + * @throws Exception If failed. + */ public void testExplicitTransactionRetries() throws Exception { explicitTransactionRetries(TestMemoryMode.HEAP, false); } @@ -108,6 +122,13 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr /** * @throws Exception If failed. */ + public void testExplicitTransactionRetriesSingleOperation() throws Exception { + explicitTransactionRetries(TestMemoryMode.HEAP, false); + } + + /** + * @throws Exception If failed. + */ public void testExplicitTransactionRetriesStoreEnabled() throws Exception { explicitTransactionRetries(TestMemoryMode.HEAP, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java index d239ea8..91eecbb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -104,6 +105,8 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(disc); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + if (include) cfg.setUserAttributes(F.asMap("include", true)); http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index 6089795..552dd28 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -569,9 +569,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest { GridNioServerListener lsnr, @Nullable Integer queueLimit) throws Exception { for (int i = 0; i < 10; i++) { - try { - int srvPort = port++; + int srvPort = port++; + try { GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr); if (queueLimit != null) @@ -584,8 +584,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest { return srvr; } catch (IgniteCheckedException e) { - if (i < 9 && e.hasCause(BindException.class)) - log.error("Failed to start server, will try another port: " + e); + if (i < 9 && e.hasCause(BindException.class)) { + log.error("Failed to start server, will try another port [err=" + e + ", port=" + srvPort + ']'); + + U.sleep(5000); + } else throw e; }