ignite-2407 Fixed 'primary_sync' mode for transactional cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1af2c7b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1af2c7b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1af2c7b Branch: refs/heads/ignite-2004 Commit: f1af2c7b077b5483ff2b68a6e76775516518c598 Parents: e7e223f Author: sboikov <[email protected]> Authored: Tue Apr 5 15:26:13 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Apr 5 15:26:13 2016 +0300 ---------------------------------------------------------------------- .../ClientAbstractMultiNodeSelfTest.java | 8 +- .../GridDistributedTxFinishRequest.java | 2 + .../dht/GridDhtTransactionalCacheAdapter.java | 4 +- .../distributed/dht/GridDhtTxFinishFuture.java | 11 +- .../cache/distributed/dht/GridDhtTxLocal.java | 204 ++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 6 +- .../cache/distributed/dht/GridDhtTxRemote.java | 5 - .../colocated/GridDhtColocatedLockFuture.java | 17 +- .../distributed/near/GridNearLockFuture.java | 10 +- .../near/GridNearTxFinishFuture.java | 81 +- .../near/GridNearTxFinishRequest.java | 46 +- .../cache/distributed/near/GridNearTxLocal.java | 24 - .../distributed/near/GridNearTxRemote.java | 5 - .../cache/transactions/IgniteInternalTx.java | 30 +- .../cache/transactions/IgniteTxAdapter.java | 68 +- .../cache/transactions/IgniteTxHandler.java | 17 +- .../IgniteTxImplicitSingleStateImpl.java | 7 +- .../IgniteTxRemoteStateAdapter.java | 7 +- .../cache/transactions/IgniteTxState.java | 5 +- .../cache/transactions/IgniteTxStateImpl.java | 28 +- .../internal/TestRecordingCommunicationSpi.java | 63 +- .../cache/IgniteCacheNearLockValueSelfTest.java | 2 +- .../distributed/IgniteCacheGetRestartTest.java | 4 + .../distributed/IgniteCachePrimarySyncTest.java | 45 +- .../IgniteCacheReadFromBackupTest.java | 12 +- .../IgniteCacheSingleGetMessageTest.java | 8 +- .../IgniteTxCachePrimarySyncTest.java | 1114 ++++++++++++++++++ ...teSynchronizationModesMultithreadedTest.java | 422 +++++++ .../GridCacheDhtPreloadMessageCountTest.java | 6 +- .../dht/GridCacheTxNodeFailureSelfTest.java | 10 +- .../testsuites/IgniteCacheTestSuite4.java | 6 + 31 files changed, 1910 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index 5e5a68d..80e7baa 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@ -518,13 +518,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract IgniteInternalTx t = tm.tx(v); if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1")))) - assertFalse("Invalid tx flags: " + t, t.syncCommit()); + assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2")))) - assertTrue("Invalid tx flags: " + t, t.syncCommit()); + assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3")))) - assertFalse("Invalid tx flags: " + t, t.syncCommit()); + assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4")))) - assertTrue("Invalid tx flags: " + t, t.syncCommit()); + assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index a761fec..ad69d14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -50,9 +50,11 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { private boolean commit; /** Sync commit flag. */ + @Deprecated private boolean syncCommit; /** Sync commit flag. */ + @Deprecated private boolean syncRollback; /** Min version used as base for completed versions. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index b6639f6..f19980b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -889,7 +890,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.subjectId(), req.taskNameHash()); - tx.syncCommit(req.syncCommit()); + if (req.syncCommit()) + tx.syncMode(FULL_SYNC); tx = ctx.tm().onCreated(null, tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index b9afbed..ebda52c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -46,6 +45,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.transactions.TransactionState.COMMITTING; /** @@ -217,8 +218,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (finishErr == null) finishErr = this.tx.commitError(); - // Always send finish reply. - this.tx.sendFinishReply(commit, finishErr); + if (tx.syncMode() != PRIMARY_SYNC) + this.tx.sendFinishReply(commit, finishErr); // Don't forget to clean up. cctx.mvcc().removeFuture(futId); @@ -277,7 +278,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (tx.onePhaseCommit()) return false; - boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); + boolean sync = tx.syncMode() == FULL_SYNC; if (tx.explicitLock()) sync = true; @@ -346,7 +347,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (tx.onePhaseCommit()) return false; - boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); + boolean sync = tx.syncMode() == FULL_SYNC; if (tx.explicitLock()) sync = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index acd5017..d1f88d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -54,6 +53,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; @@ -488,6 +488,51 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa return chainOnePhasePrepare(fut); } + /** + * @param prepFut Prepare future. + * @param fut Finish future. + */ + private void finishCommit(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) { + boolean primarySync = syncMode() == PRIMARY_SYNC; + + IgniteCheckedException err = null; + + try { + if (prepFut != null) + prepFut.get(); // Check for errors. + + if (finish(true)) { + if (primarySync) + sendFinishReply(true, null); + + fut.finish(); + } + else { + err = new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this)); + + fut.onError(err); + } + } + catch (IgniteTxOptimisticCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']'); + + err = e; + + fut.onError(e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to prepare transaction: " + this, e); + + err = e; + + fut.onError(e); + } + + if (primarySync && err != null) + sendFinishReply(true, err); + } + /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { @@ -505,73 +550,20 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa GridDhtTxPrepareFuture prep = prepFut; if (prep != null) { - if (prep.isDone()) { - try { - prep.get(); // Check for errors of a parent future. - - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to optimistically prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); - - fut.onError(e); - } - } - else + if (prep.isDone()) + finishCommit(prep, fut); + else { prep.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - try { - f.get(); // Check for errors of a parent future. - - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + - CU.txString(GridDhtTxLocal.this))); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare transaction: " + this, e); - - fut.onError(e); - } + finishCommit(f, fut); } }); + } } else { assert optimistic(); - try { - if (finish(true)) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); - - fut.onError(e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to commit transaction: " + this, e); - - fut.onError(e); - } + finishCommit(null, fut); } return fut; @@ -584,64 +576,70 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa PREP_FUT_UPD.compareAndSet(this, fut, null); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - GridDhtTxPrepareFuture prepFut = this.prepFut; + /** + * @param prepFut Prepare future. + * @param fut Finish future. + */ + private void finishRollback(@Nullable IgniteInternalFuture prepFut, GridDhtTxFinishFuture fut) { + try { + if (prepFut != null) + prepFut.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']'); + } - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); + boolean primarySync = syncMode() == PRIMARY_SYNC; - cctx.mvcc().addFuture(fut, fut.futureId()); + IgniteCheckedException err = null; - if (prepFut == null) { - try { - if (finish(false) || state() == UNKNOWN) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this))); - } - catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed optimistically to prepare transaction [tx=" + this + ", e=" + e + ']'); + try { + if (finish(false) || state() == UNKNOWN) { + if (primarySync) + sendFinishReply(false, null); - fut.onError(e); + fut.finish(); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction (will make the best effort to rollback remote nodes): " + - this, e); + else { + err = new IgniteCheckedException("Failed to rollback transaction: " + + CU.txString(GridDhtTxLocal.this)); - fut.onError(e); + fut.onError(err); } } - else { - prepFut.complete(); + catch (IgniteCheckedException e) { + U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this), + e); - prepFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - f.get(); // Check for errors of a parent future. - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to prepare or rollback transaction [tx=" + this + ", e=" + e + ']'); - } + err = e; - try { - if (finish(false) || state() == UNKNOWN) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + - CU.txString(GridDhtTxLocal.this))); + fut.onError(e); + } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully rollback transaction: " + CU.txString(GridDhtTxLocal.this), - e); + if (primarySync && err != null) + sendFinishReply(false, err); + } - fut.onError(e); - } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { + GridDhtTxPrepareFuture prepFut = this.prepFut; + + final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false); + + cctx.mvcc().addFuture(fut, fut.futureId()); + + if (prepFut != null) { + prepFut.complete(); + + prepFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + finishRollback(f, fut); } }); } + else + finishRollback(null, fut); return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 0541c8a..df0068a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -647,7 +647,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { IgniteInternalFuture<IgniteInternalTx> fut = null; - CIX1<IgniteInternalFuture<IgniteInternalTx>> responseClo = + CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo = new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { try { @@ -674,7 +674,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter fut = tx.rollbackAsync(); - fut.listen(responseClo); + fut.listen(resClo); throw e; } @@ -684,7 +684,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter fut = tx.rollbackAsync(); if (fut != null) - fut.listen(responseClo); + fut.listen(resClo); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f509e27..dc27eb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -247,11 +247,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return false; // Serializable will be enforced on primary mode. - } - - /** {@inheritDoc} */ @Override public GridCacheVersion nearXidVersion() { return nearXidVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index e4c6b71..5810028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -70,6 +70,7 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; /** @@ -253,20 +254,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** - * @return {@code True} if commit is synchronous. - */ - private boolean syncCommit() { - return tx != null && tx.syncCommit(); - } - - /** - * @return {@code True} if rollback is synchronous. - */ - private boolean syncRollback() { - return tx != null && tx.syncRollback(); - } - - /** * @return Transaction isolation or {@code null} if no transaction. */ @Nullable private TransactionIsolation isolation() { @@ -897,7 +884,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture timeout, mappedKeys.size(), inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncCommit(), + inTx() && tx.syncMode() == FULL_SYNC, inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 5d4fc01..0d17bd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -68,6 +68,7 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; /** @@ -265,13 +266,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** - * @return {@code True} if rollback is synchronous. - */ - private boolean syncRollback() { - return tx != null && tx.syncRollback(); - } - - /** * @return Transaction isolation or {@code null} if no transaction. */ @Nullable private TransactionIsolation isolation() { @@ -1013,7 +1007,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean timeout, mappedKeys.size(), inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncCommit(), + inTx() && tx.syncMode() == FULL_SYNC, inTx() ? tx.subjectId() : null, inTx() ? tx.taskNameHash() : 0, read ? accessTtl : -1L, http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 5c4aca0..fe6290d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -55,6 +56,8 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionRollbackException; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; @@ -70,6 +73,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.1"); /** */ + public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0"); + + /** */ private static final long serialVersionUID = 0L; /** Logger reference. */ @@ -120,6 +126,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridNearTxFinishFuture.class); + + CacheWriteSynchronizationMode syncMode; + + if (tx.explicitLock()) + syncMode = FULL_SYNC; + else + syncMode = tx.syncMode(); + + tx.syncMode(syncMode); } /** {@inheritDoc} */ @@ -322,20 +337,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * Completeness callback. - */ - private void onComplete() { - onDone(tx); - } - - /** - * @return Synchronous flag. - */ - private boolean isSync() { - return tx.explicitLock() || (commit ? tx.syncCommit() : tx.syncRollback()); - } - - /** * Initializes future. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -366,26 +367,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } markInitialized(); - - if (!isSync() && !isDone()) { - boolean complete = true; - - synchronized (futs) { - // Avoid collection copy and iterator creation. - for (int i = 0; i < futs.size(); i++) { - IgniteInternalFuture<IgniteInternalTx> f = futs.get(i); - - if (isMini(f) && !f.isDone()) { - complete = false; - - break; - } - } - } - - if (complete) - onComplete(); - } } else onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx))); @@ -441,7 +422,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu readyNearMappingFromBackup(mapping); if (committed) { - if (tx.syncCommit()) { + if (tx.syncMode() == FULL_SYNC) { GridCacheVersion nearXidVer = tx.nearXidVersion(); assert nearXidVer != null : tx; @@ -511,6 +492,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (finish) { GridDistributedTxMapping mapping = tx.mappings().singleMapping(); + assert mapping != null : tx; + if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0) finish = false; } @@ -575,6 +558,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu assert !m.empty(); + CacheWriteSynchronizationMode syncMode = tx.syncMode(); + + if (m.explicitLock()) + syncMode = FULL_SYNC; + GridNearTxFinishRequest req = new GridNearTxFinishRequest( futId, tx.xidVersion(), @@ -583,8 +571,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.isInvalidate(), tx.system(), tx.ioPolicy(), - tx.syncCommit(), - tx.syncRollback(), + syncMode, m.explicitLock(), tx.storeEnabled(), tx.topologyVersion(), @@ -604,7 +591,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finish(n.id(), tx, req); // Add new future. - if (fut != null) + if (fut != null && syncMode == FULL_SYNC) add(fut); } else { @@ -620,8 +607,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu try { cctx.io().send(n, req, tx.ioPolicy()); + boolean wait; + + if (syncMode == PRIMARY_SYNC) + wait = n.version().compareToIgnoreTimestamp(PRIMARY_SYNC_TXS_SINCE) >= 0; + else + wait = syncMode == FULL_SYNC; + // If we don't wait for result, then mark future as done. - if (!isSync() && !m.explicitLock()) + if (!wait) fut.onDone(); } catch (ClusterTopologyCheckedException e) { @@ -665,9 +659,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu ", loc=" + node.isLocal() + ", done=" + f.isDone() + "]"; } - else { + else return "CheckBackupFuture[node=null, done=" + f.isDone() + "]"; - } } else if (f.getClass() == CheckRemoteTxMiniFuture.class) { CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f; @@ -703,8 +696,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.system(), tx.ioPolicy(), false, - tx.syncCommit(), - tx.syncRollback(), + tx.syncMode() == FULL_SYNC, + tx.syncMode() == FULL_SYNC, null, null, null, @@ -780,7 +773,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply: " + this); - if (isSync()) { + if (tx.syncMode() == FULL_SYNC) { Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes(); if (txNodes != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java index 65eac63..dfbbe18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.UUID; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -56,6 +57,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { /** Task name hash. */ private int taskNameHash; + /** Write synchronization mode. */ + private CacheWriteSynchronizationMode syncMode; + /** * Empty constructor required for {@link Externalizable}. */ @@ -71,8 +75,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { * @param invalidate Invalidate flag. * @param sys System flag. * @param plc IO policy. - * @param syncCommit Sync commit flag. - * @param syncRollback Sync rollback flag. + * @param syncMode Write synchronization mode. * @param explicitLock Explicit lock flag. * @param storeEnabled Store enabled flag. * @param topVer Topology version. @@ -92,8 +95,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { boolean invalidate, boolean sys, byte plc, - boolean syncCommit, - boolean syncRollback, + CacheWriteSynchronizationMode syncMode, boolean explicitLock, boolean storeEnabled, @NotNull AffinityTopologyVersion topVer, @@ -113,8 +115,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { invalidate, sys, plc, - syncCommit, - syncRollback, + syncMode == CacheWriteSynchronizationMode.FULL_SYNC, + syncMode == CacheWriteSynchronizationMode.FULL_SYNC, baseVer, committedVers, rolledbackVers, @@ -122,6 +124,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { addDepInfo ); + this.syncMode = syncMode; this.explicitLock = explicitLock; this.storeEnabled = storeEnabled; this.topVer = topVer; @@ -130,6 +133,13 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { } /** + * @return Transaction write synchronization mode (can be null is message sent from old nodes). + */ + @Nullable public CacheWriteSynchronizationMode syncMode() { + return syncMode; + } + + /** * @return Explicit lock flag. */ public boolean explicitLock() { @@ -218,12 +228,18 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 22: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 23: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("topVer", topVer)) return false; @@ -278,14 +294,26 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 22: - taskNameHash = reader.readInt("taskNameHash"); + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); if (!reader.isLastRead()) return false; + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + reader.incrementState(); case 23: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 24: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -305,7 +333,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 24; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f7c330e..4aee6ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -207,11 +207,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return false; - } - - /** {@inheritDoc} */ @Override protected UUID nearNodeId() { return cctx.localNodeId(); } @@ -244,16 +239,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { PREP_FUT_UPD.compareAndSet(this, fut, null); } - /** {@inheritDoc} */ - @Override public boolean syncCommit() { - return sync(); - } - - /** {@inheritDoc} */ - @Override public boolean syncRollback() { - return sync(); - } - /** * Marks transaction to check if commit on backup. */ @@ -284,15 +269,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** - * Checks if transaction is fully synchronous. - * - * @return {@code True} if transaction is fully synchronous. - */ - private boolean sync() { - return super.syncCommit() || txState().sync(cctx); - } - - /** * @return {@code True} if transaction contains at least one near cache key mapped to the local node. */ public boolean nearLocallyMapped() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 6b17d5e..4f4be57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -227,11 +227,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return false; // Serializable will be enforced on primary mode. - } - - /** {@inheritDoc} */ @Override public GridCacheVersion ownedVersion(IgniteTxKey key) { return owned == null ? null : owned.get(key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index cdf2354..e08f9b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -386,15 +387,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean ownsLockUnsafe(GridCacheEntryEx entry); /** - * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR - * transactions because serializability of transaction is enforced on primary node. All - * other transaction types must enforce it. - * - * @return Enforce serializable flag. - */ - public boolean enforceSerializable(); - - /** * @return {@code True} if near transaction. */ public boolean near(); @@ -442,14 +434,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean user(); /** - * @return {@code True} if transaction is configured with synchronous commit flag. - */ - public boolean syncCommit(); - - /** - * @return {@code True} if transaction is configured with synchronous rollback flag. + * @return Transaction write synchronization mode. */ - public boolean syncRollback(); + public CacheWriteSynchronizationMode syncMode(); /** * @param key Key to check. @@ -524,11 +511,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { KeyCacheObject key) throws GridCacheFilterFailedException; /** - * @return Start version. - */ - public GridCacheVersion startVersion(); - - /** * @return Transaction version. */ public GridCacheVersion xidVersion(); @@ -544,12 +526,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public void commitVersion(GridCacheVersion commitVer); /** - * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) - * assigned to this transaction at the end of write phase. - */ - public GridCacheVersion endVersion(); - - /** * Prepare state. * * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 77f3765..50598c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -36,6 +36,7 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -184,10 +185,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement protected boolean onePhaseCommit; /** */ - protected boolean syncCommit; - - /** */ - protected boolean syncRollback; + protected CacheWriteSynchronizationMode syncMode; /** If this transaction contains transform entries. */ protected boolean transform; @@ -638,32 +636,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean syncCommit() { - return syncCommit; - } - - /** {@inheritDoc} */ - @Override public boolean syncRollback() { - return syncRollback; - } + @Override public CacheWriteSynchronizationMode syncMode() { + if (syncMode != null) + return syncMode; - /** - * @param syncCommit Synchronous commit flag. - */ - public void syncCommit(boolean syncCommit) { - this.syncCommit = syncCommit; + return txState().syncMode(cctx); } /** - * @param syncRollback Synchronous rollback flag. + * @param syncMode Write synchronization mode. */ - public void syncRollback(boolean syncRollback) { - this.syncRollback = syncRollback; + public void syncMode(CacheWriteSynchronizationMode syncMode) { + this.syncMode = syncMode; } /** {@inheritDoc} */ @@ -1154,16 +1138,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public GridCacheVersion startVersion() { - return startVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion endVersion() { - return endVer; - } - - /** {@inheritDoc} */ @Override public void endVersion(GridCacheVersion endVer) { this.endVer = endVer; } @@ -1897,11 +1871,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return false; - } - - /** {@inheritDoc} */ @Override public boolean near() { return false; } @@ -1942,13 +1911,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean syncCommit() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean syncRollback() { - return false; + @Override public CacheWriteSynchronizationMode syncMode() { + return null; } /** {@inheritDoc} */ @@ -2014,11 +1978,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public GridCacheVersion startVersion() { - return null; - } - - /** {@inheritDoc} */ @Override public GridCacheVersion xidVersion() { return null; } @@ -2034,11 +1993,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public GridCacheVersion endVersion() { - return null; - } - - /** {@inheritDoc} */ @Override public void prepare() throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 41dc43f..a764d5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -72,6 +72,8 @@ import org.apache.ignite.lang.IgniteFutureCancelledException; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; @@ -389,7 +391,7 @@ public class IgniteTxHandler { if (tx != null) { if (req.explicitLock()) - tx.explicitLock(req.explicitLock()); + tx.explicitLock(true); tx.transactionNodes(req.transactionNodes()); @@ -688,6 +690,14 @@ public class IgniteTxHandler { assert tx != null : "Transaction is null for near finish request [nodeId=" + nodeId + ", req=" + req + "]"; + if (req.syncMode() == null) { + boolean sync = req.commit() ? req.syncCommit() : req.syncRollback(); + + tx.syncMode(sync ? FULL_SYNC : FULL_ASYNC); + } + else + tx.syncMode(req.syncMode()); + if (req.commit()) { tx.storeEnabled(req.storeEnabled()); @@ -698,9 +708,6 @@ public class IgniteTxHandler { return null; } - if (!tx.syncCommit()) - tx.syncCommit(req.syncCommit()); - tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); @@ -712,8 +719,6 @@ public class IgniteTxHandler { return commitFut; } else { - tx.syncRollback(req.syncRollback()); - tx.nearFinishFutureId(req.futureId()); tx.nearFinishMiniId(req.miniId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 2f1e16f..965ef2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -36,7 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; /** * @@ -105,8 +106,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean sync(GridCacheSharedContext cctx) { - return cacheCtx != null && cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC; + @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) { + return cacheCtx != null ? cacheCtx.config().getWriteSynchronizationMode() : FULL_ASYNC; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 3e5034b..79b4a74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -26,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; + /** * */ @@ -55,10 +58,10 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState } /** {@inheritDoc} */ - @Override public boolean sync(GridCacheSharedContext cctx) { + @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) { assert false; - return false; + return FULL_ASYNC; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index 18fce8d..b133533 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -63,9 +64,9 @@ public interface IgniteTxState { /** * @param cctx Context. - * @return {@code True} if transaction is fully synchronous. + * @return Write synchronization mode. */ - public boolean sync(GridCacheSharedContext cctx); + public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx); /** * @param cctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 1256aa6..c826de1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheInterceptor; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -39,7 +40,9 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * @@ -134,15 +137,32 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean sync(GridCacheSharedContext cctx) { + @Override public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx) { + CacheWriteSynchronizationMode syncMode = CacheWriteSynchronizationMode.FULL_ASYNC; + for (int i = 0; i < activeCacheIds.size(); i++) { int cacheId = (int)activeCacheIds.get(i); - if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) - return true; + CacheWriteSynchronizationMode cacheSyncMode = + cctx.cacheContext(cacheId).config().getWriteSynchronizationMode(); + + switch (cacheSyncMode) { + case FULL_SYNC: + return FULL_SYNC; + + case PRIMARY_SYNC: { + if (syncMode == FULL_ASYNC) + syncMode = PRIMARY_SYNC; + + break; + } + + case FULL_ASYNC: + break; + } } - return false; + return syncMode; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 307a470..2aed459 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,7 +34,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; @@ -42,7 +42,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; */ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** */ - private Class<?> recordCls; + private Set<Class<?>> recordClasses; /** */ private List<Object> recordedMsgs = new ArrayList<>(); @@ -65,7 +65,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { Object msg0 = ioMsg.message(); synchronized (this) { - if (recordCls != null && msg0.getClass().equals(recordCls)) + if (recordClasses != null && recordClasses.contains(msg0.getClass())) recordedMsgs.add(msg0); boolean block = false; @@ -97,28 +97,46 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** - * @param recordCls Message class to record. + * @param recordClasses Message classes to record. */ - public void record(@Nullable Class<?> recordCls) { + public void record(Class<?>... recordClasses) { synchronized (this) { - this.recordCls = recordCls; + if (this.recordClasses == null) + this.recordClasses = new HashSet<>(); + + Collections.addAll(this.recordClasses, recordClasses); + + recordedMsgs = new ArrayList<>(); } } /** + * @param stopRecord Stop record flag. * @return Recorded messages. */ - public List<Object> recordedMessages() { + public List<Object> recordedMessages(boolean stopRecord) { synchronized (this) { List<Object> msgs = recordedMsgs; recordedMsgs = new ArrayList<>(); + if (stopRecord) + recordClasses = null; + return msgs; } } /** + * @return {@code True} if there are blocked messages. + */ + public boolean hasBlockedMessages() { + synchronized (this) { + return !blockedMsgs.isEmpty(); + } + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgnitePredicate<GridIoMessage> blockP) { @@ -146,22 +164,35 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** - * Stops block messages and sends all already blocked messages. + * Stops block messages and can sends all already blocked messages. */ public void stopBlock() { + stopBlock(true); + } + + /** + * Stops block messages and sends all already blocked messages if sndMsgs is 'true'. + * + * @param sndMsgs If {@code true} sends blocked messages. + */ + public void stopBlock(boolean sndMsgs) { synchronized (this) { + blockP = null; + blockCls.clear(); blockP = null; - for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { - try { - ignite.log().info("Send blocked message [node=" + msg.get1().id() + - ", msg=" + msg.get2().message() + ']'); + if (sndMsgs) { + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + try { + ignite.log().info("Send blocked message [node=" + msg.get1().id() + + ", msg=" + msg.get2().message() + ']'); - super.sendMessage(msg.get1(), msg.get2()); - } - catch (Throwable e) { - U.error(ignite.log(), "Failed to send blocked message: " + msg, e); + super.sendMessage(msg.get1(), msg.get2()); + } + catch (Throwable e) { + U.error(ignite.log(), "Failed to send blocked message: " + msg, e); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java index f106fec..a35d5a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java @@ -94,7 +94,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); - Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages(); + Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages(false); assertEquals(1, reqs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java index b14109b..71d1182 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * @@ -160,6 +161,8 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest { ignite(SRVS).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); try (IgniteDataStreamer<Object, Object> streamer = ignite(0).dataStreamer(ccfg.getName())) { + streamer.allowOverwrite(true); + for (int i = 0; i < KEYS; i++) streamer.addData(i, i); } @@ -274,6 +277,7 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest { ccfg.setNearConfiguration(new NearCacheConfiguration<>()); ccfg.setRebalanceMode(ASYNC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); return ccfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java index cef73fd..183d4bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePrimarySyncTest.java @@ -21,16 +21,26 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * @@ -94,21 +104,36 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPutGet() throws Exception { - checkPutGet(ignite(SRVS).cache("cache1")); + Ignite ignite = ignite(SRVS); - checkPutGet(ignite(SRVS).cache("cache2")); + checkPutGet(ignite.cache("cache1"), null, null, null); + + checkPutGet(ignite.cache("cache2"), null, null, null); + + checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, REPEATABLE_READ); + + checkPutGet(ignite.cache("cache2"), ignite.transactions(), OPTIMISTIC, SERIALIZABLE); + + checkPutGet(ignite.cache("cache2"), ignite.transactions(), PESSIMISTIC, READ_COMMITTED); } /** * @param cache Cache. + * @param txs Transactions instance if explicit transaction should be used. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. */ - private void checkPutGet(IgniteCache<Object, Object> cache) { + private void checkPutGet(IgniteCache<Object, Object> cache, + @Nullable IgniteTransactions txs, + TransactionConcurrency concurrency, + TransactionIsolation isolation) { log.info("Check cache: " + cache.getName()); final int KEYS = 50; for (int iter = 0; iter < 100; iter++) { - log.info("Iteration: " + iter); + if (iter % 10 == 0) + log.info("Iteration: " + iter); for (int i = 0; i < KEYS; i++) cache.remove(i); @@ -118,12 +143,20 @@ public class IgniteCachePrimarySyncTest extends GridCommonAbstractTest { for (int i = 0; i < KEYS; i++) putBatch.put(i, iter); - cache.putAll(putBatch); + if (txs != null) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cache.putAll(putBatch); + + tx.commit(); + } + } + else + cache.putAll(putBatch); Map<Object, Object> vals = cache.getAll(putBatch.keySet()); for (int i = 0; i < KEYS; i++) - assertNotNull(vals.get(i)); + assertEquals(iter, vals.get(i)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java index af018cc..2ccd950 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java @@ -123,7 +123,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { assertNull(cache.get(key)); - List<Object> msgs = spi.recordedMessages(); + List<Object> msgs = spi.recordedMessages(false); assertEquals(1, msgs.size()); } @@ -216,7 +216,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { assertNull(cache.get(key)); - List<Object> msgs = newNodeSpi.recordedMessages(); + List<Object> msgs = newNodeSpi.recordedMessages(false); assertEquals(1, msgs.size()); @@ -234,7 +234,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); - spi.stopBlock(); + spi.stopBlock(true); } awaitPartitionMapExchange(); @@ -304,7 +304,7 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi(); - List<Object> msgs = spi.recordedMessages(); + List<Object> msgs = spi.recordedMessages(false); assertEquals(0, msgs.size()); } @@ -330,14 +330,14 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest { if (nearKey != null) { assertNull(cache.get(nearKey)); - msgs = spi.recordedMessages(); + msgs = spi.recordedMessages(false); assertEquals(1, msgs.size()); } assertNull(cache.get(backupKey)); - msgs = spi.recordedMessages(); + msgs = spi.recordedMessages(false); assertTrue(msgs.isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f1af2c7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java index 48fc961..08f44cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java @@ -248,12 +248,12 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { * @param primarySpi Primary node SPI. */ private void checkMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) { - List<Object> msgs = spi.recordedMessages(); + List<Object> msgs = spi.recordedMessages(false); assertEquals(1, msgs.size()); assertTrue(msgs.get(0) instanceof GridNearSingleGetRequest); - msgs = primarySpi.recordedMessages(); + msgs = primarySpi.recordedMessages(false); assertEquals(1, msgs.size()); assertTrue(msgs.get(0) instanceof GridNearSingleGetResponse); @@ -264,10 +264,10 @@ public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { * @param primarySpi Primary node SPI. */ private void checkNoMessages(TestRecordingCommunicationSpi spi, TestRecordingCommunicationSpi primarySpi) { - List<Object> msgs = spi.recordedMessages(); + List<Object> msgs = spi.recordedMessages(false); assertEquals(0, msgs.size()); - msgs = primarySpi.recordedMessages(); + msgs = primarySpi.recordedMessages(false); assertEquals(0, msgs.size()); }
