Repository: incubator-ignite Updated Branches: refs/heads/ignite-80 580f05117 -> 108a3e6a0
IGNITE-80 - Fixed atomic cache op inside active transaction. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/108a3e6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/108a3e6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/108a3e6a Branch: refs/heads/ignite-80 Commit: 108a3e6a02da114d23918a5d19e4978910eb5a9b Parents: 580f051 Author: Alexey Goncharuk <[email protected]> Authored: Fri Apr 3 19:32:09 2015 -0700 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Apr 3 19:32:09 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 22 +- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 46 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 34 +- .../dht/colocated/GridDhtColocatedCache.java | 1 - .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../cache/transactions/IgniteTxManager.java | 21 + .../IgniteCacheAtomicOpWithinTxSelfTest.java | 454 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 9 files changed, 567 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f21eabc..a991620 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -103,6 +103,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** System cache names. */ private final Set<String> sysCaches; + /** System cache names. */ + private final Set<Integer> sysCacheIds; + /** Caches stop sequence. */ private final Deque<String> stopSeq; @@ -132,6 +135,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { preloadFuts = new TreeMap<>(); sysCaches = new HashSet<>(); + sysCacheIds = new HashSet<>(); stopSeq = new LinkedList<>(); } @@ -548,16 +552,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (FileSystemConfiguration igfsCfg : igfsCfgs) { sysCaches.add(maskNull(igfsCfg.getMetaCacheName())); sysCaches.add(maskNull(igfsCfg.getDataCacheName())); + + sysCacheIds.add(CU.cacheId(igfsCfg.getMetaCacheName())); + sysCacheIds.add(CU.cacheId(igfsCfg.getDataCacheName())); } } - if (IgniteComponentType.HADOOP.inClassPath()) + if (IgniteComponentType.HADOOP.inClassPath()) { sysCaches.add(CU.SYS_CACHE_HADOOP_MR); + sysCacheIds.add(CU.cacheId(CU.SYS_CACHE_HADOOP_MR)); + } sysCaches.add(CU.MARSH_CACHE_NAME); sysCaches.add(CU.UTILITY_CACHE_NAME); sysCaches.add(CU.ATOMICS_CACHE_NAME); + sysCacheIds.add(CU.cacheId(CU.MARSH_CACHE_NAME)); + sysCacheIds.add(CU.cacheId(CU.UTILITY_CACHE_NAME)); + sysCacheIds.add(CU.cacheId(CU.ATOMICS_CACHE_NAME)); + CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); sharedCtx = createSharedContext(ctx); @@ -2238,6 +2251,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Collection of all system cache IDs. + */ + public Collection<Integer> systemCacheIds() { + return sysCacheIds; + } + + /** * @param name Cache name. * @param <K> type of keys. * @param <V> type of values. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index be9a963..495d56c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1081,7 +1081,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. - if (topology().topologyVersion().equals(req.topologyVersion()) || + // Also do not check topology version if topology was locked on near node by + // external transaction or explicit lock. + if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() || ctx.config().getAtomicWriteOrderMode() == CLOCK) { ClusterNode node = ctx.discovery().node(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ac4ae2c2..2131155 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -139,6 +140,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Map time. */ private volatile long mapTime; + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -295,7 +299,23 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param waitTopFut Whether to wait for topology future. */ public void map(boolean waitTopFut) { - mapOnTopology(keys, false, null, waitTopFut); + AffinityTopologyVersion topVer = null; + + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); + + if (tx != null && tx.topologyVersionSnapshot() != null) + topVer = tx.topologyVersionSnapshot(); + + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer == null) + mapOnTopology(keys, false, null, waitTopFut); + else { + topLocked = true; + + map0(topVer, keys, false, null); + } } /** {@inheritDoc} */ @@ -427,13 +447,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); - if (fut.isDone()) { + if (fut.isDone()) topVer = fut.topologyVersion(); - - if (futVer == null) - // Assign future version in topology read lock before first exception may be thrown. - futVer = cctx.versions().next(topVer); - } else { if (waitTopFut) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -447,11 +462,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - - mapTime = U.currentTimeMillis(); - - if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) - cctx.mvcc().addAtomicFuture(version(), this); } finally { cache.topology().readUnlock(); @@ -495,6 +505,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + if (futVer == null) { + // Assign future version in topology read lock before first exception may be thrown. + futVer = cctx.versions().next(topVer); + + mapTime = U.currentTimeMillis(); + } + + if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) + cctx.mvcc().addAtomicFuture(version(), this); + CacheConfiguration ccfg = cctx.config(); // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -578,6 +598,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> fastMap, updVer, topVer, + topLocked, syncMode, op, retval, @@ -700,6 +721,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> fastMap, updVer, topVer, + topLocked, syncMode, op, retval, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 3f68a46..05bcf73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -64,6 +64,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Topology version. */ private AffinityTopologyVersion topVer; + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private boolean topLocked; + /** Write synchronization mode. */ private CacheWriteSynchronizationMode syncMode; @@ -158,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri boolean fastMap, @Nullable GridCacheVersion updateVer, @NotNull AffinityTopologyVersion topVer, + boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, boolean retval, @@ -174,6 +178,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.updateVer = updateVer; this.topVer = topVer; + this.topLocked = topLocked; this.syncMode = syncMode; this.op = op; this.retval = retval; @@ -248,6 +253,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Topology locked flag. + */ + public boolean topologyLocked() { + return topLocked; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -645,18 +657,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -815,7 +833,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 19: - topVer = reader.readMessage("topVer"); + topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) return false; @@ -823,7 +841,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 20: - updateVer = reader.readMessage("updateVer"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -831,6 +849,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 21: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 0de4653..679f507 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.affinity.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 3087dff..585a64b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -284,7 +284,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } } else { @@ -303,7 +303,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } else cand = cand.reenter(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a8ff280..54c79e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -644,6 +644,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return Any transaction associated with the current thread. + */ + public IgniteInternalTx anyActiveThreadTx() { + long threadId = Thread.currentThread().getId(); + + IgniteInternalTx tx = threadMap.get(threadId); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + + for (int cacheId : cctx.cache().systemCacheIds()) { + tx = sysThreadMap.get(new TxThreadKey(threadId, cacheId)); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + } + + return null; + } + + /** * @return Local transaction. */ @Nullable public IgniteInternalTx localTxx() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java new file mode 100644 index 0000000..b8682b7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Test checks that atomic operations are not blocked when invoked inside transaction. + */ +@SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) +public class IgniteCacheAtomicOpWithinTxSelfTest extends GridCommonAbstractTest { + /** */ + private static final String TRANSACTIONAL_CACHE = "tx"; + + /** */ + private static final String ATOMIC_CACHE = "atomic"; + + /** */ + private static final int NODE_COUNT = 3; + + /** */ + private CacheWriteSynchronizationMode syncMode; + + /** */ + private CacheAtomicWriteOrderMode orderMode; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODE_COUNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * + */ + private void initCaches() { + CacheConfiguration txCfg = new CacheConfiguration(TRANSACTIONAL_CACHE); + + txCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + txCfg.setBackups(1); + + grid(0).createCache(txCfg); + + CacheConfiguration atomicCfg = new CacheConfiguration(ATOMIC_CACHE); + + atomicCfg.setBackups(1); + atomicCfg.setWriteSynchronizationMode(syncMode); + atomicCfg.setAtomicWriteOrderMode(orderMode); + + grid(0).createCache(atomicCfg); + } + + /** + * + */ + private void destroyCaches() { + grid(0).destroyCache(TRANSACTIONAL_CACHE); + grid(0).destroyCache(ATOMIC_CACHE); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxPrimarySyncPrimaryNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpTx(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxPrimarySyncClockNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpTx(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxFullSyncPrimaryNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpTx(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxFullSyncClockNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpTx(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxPrimarySyncPrimary() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpTx(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxPrimarySyncClock() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpTx(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxFullSyncPrimary() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpTx(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpTxFullSyncClock() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpTx(false); + } + + + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockPrimarySyncPrimaryNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpLock(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockPrimarySyncClockNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpLock(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockFullSyncPrimaryNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpLock(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockFullSyncClockNodeJoined() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpLock(true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockPrimarySyncPrimary() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockPrimarySyncClock() throws Exception { + syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockFullSyncPrimary() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.PRIMARY; + + checkAtomicOpLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpLockFullSyncClock() throws Exception { + syncMode = CacheWriteSynchronizationMode.FULL_SYNC; + orderMode = CacheAtomicWriteOrderMode.CLOCK; + + checkAtomicOpLock(false); + } + + /** + * @param startNode {@code True} if should start node inside tx. + * @throws Exception If failed. + */ + private void checkAtomicOpTx(boolean startNode) throws Exception { + initCaches(); + + try { + IgniteCache<Object, Object> txCache = grid(0).cache(TRANSACTIONAL_CACHE); + IgniteCache<Object, Object> atomicCache = grid(0).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, -1); + + try { + IgniteInternalFuture<Object> fut = null; + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 10; i++) + txCache.put("key" + i, i); + + if (startNode) { + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(NODE_COUNT); + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODE_COUNT + 1; + } + }, getTestTimeout()); + } + + info("Starting atomic puts."); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, i); + + info("Atomic puts done."); + + tx.commit(); + } + + if (fut != null) + fut.get(); + + for (int g = 0; g < NODE_COUNT + (startNode ? 1 : 0); g++) { + IgniteCache<Object, Object> cache = grid(g).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + assertEquals(i, cache.get("key" + i)); + } + } + finally { + if (startNode) + stopGrid(NODE_COUNT); + } + } + finally { + destroyCaches(); + } + } + + /** + * @param startNode {@code True} if should start node inside tx. + * @throws Exception If failed. + */ + @SuppressWarnings("TooBroadScope") + private void checkAtomicOpLock(boolean startNode) throws Exception { + initCaches(); + + try { + IgniteCache<Object, Object> txCache = grid(0).cache(TRANSACTIONAL_CACHE); + IgniteCache<Object, Object> atomicCache = grid(0).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, -1); + + try { + Lock lock = txCache.lock("key0"); + + IgniteInternalFuture<Object> fut = null; + + lock.lock(); + + try { + for (int i = 0; i < 10; i++) + txCache.put("key" + i, i); + + if (startNode) { + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(NODE_COUNT); + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODE_COUNT + 1; + } + }, getTestTimeout()); + } + + info("Starting atomic puts."); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, i); + + info("Atomic puts done."); + } + finally { + lock.unlock(); + } + + if (fut != null) + fut.get(); + + for (int g = 0; g < NODE_COUNT + (startNode ? 1 : 0); g++) { + IgniteCache<Object, Object> cache = grid(g).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + assertEquals(i, cache.get("key" + i)); + } + } + finally { + if (startNode) + stopGrid(NODE_COUNT); + } + } + finally { + destroyCaches(); + } + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOpSystemCacheTxNodeJoined() throws Exception { + initCaches(); + + try { + GridCacheAdapter txCache = (GridCacheAdapter)grid(0).utilityCache(); + IgniteCache<Object, Object> atomicCache = grid(0).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, -1); + + try { + IgniteInternalFuture<Object> fut; + + // Intentionally start system tx. + try (Transaction tx = txCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 10; i++) + txCache.put("key" + i, i); + + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(NODE_COUNT); + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODE_COUNT + 1; + } + }, getTestTimeout()); + + info("Starting atomic puts."); + + for (int i = 0; i < 10; i++) + atomicCache.put("key" + i, i); + + info("Atomic puts done."); + + tx.commit(); + } + + if (fut != null) + fut.get(); + + for (int g = 0; g < NODE_COUNT + 1; g++) { + IgniteCache<Object, Object> cache = grid(g).cache(ATOMIC_CACHE); + + for (int i = 0; i < 10; i++) + assertEquals(i, cache.get("key" + i)); + } + } + finally { + stopGrid(NODE_COUNT); + } + } + finally { + destroyCaches(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index d5ae2e6..b5b3c29 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -427,6 +427,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTxStoreValueTest.class); suite.addTestSuite(IgniteCacheTxNearEnabledStoreValueTest.class); + suite.addTestSuite(IgniteCacheAtomicOpWithinTxSelfTest.class); + return suite; } }
