Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-80 580f05117 -> 108a3e6a0


IGNITE-80 - Fixed atomic cache op inside active transaction.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/108a3e6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/108a3e6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/108a3e6a

Branch: refs/heads/ignite-80
Commit: 108a3e6a02da114d23918a5d19e4978910eb5a9b
Parents: 580f051
Author: Alexey Goncharuk <[email protected]>
Authored: Fri Apr 3 19:32:09 2015 -0700
Committer: Alexey Goncharuk <[email protected]>
Committed: Fri Apr 3 19:32:09 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  22 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  46 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  34 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   1 -
 .../colocated/GridDhtColocatedLockFuture.java   |   4 +-
 .../cache/transactions/IgniteTxManager.java     |  21 +
 .../IgniteCacheAtomicOpWithinTxSelfTest.java    | 454 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 9 files changed, 567 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f21eabc..a991620 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -103,6 +103,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** System cache names. */
     private final Set<String> sysCaches;
 
+    /** System cache names. */
+    private final Set<Integer> sysCacheIds;
+
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
 
@@ -132,6 +135,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         preloadFuts = new TreeMap<>();
 
         sysCaches = new HashSet<>();
+        sysCacheIds = new HashSet<>();
         stopSeq = new LinkedList<>();
     }
 
@@ -548,16 +552,25 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             for (FileSystemConfiguration igfsCfg : igfsCfgs) {
                 sysCaches.add(maskNull(igfsCfg.getMetaCacheName()));
                 sysCaches.add(maskNull(igfsCfg.getDataCacheName()));
+
+                sysCacheIds.add(CU.cacheId(igfsCfg.getMetaCacheName()));
+                sysCacheIds.add(CU.cacheId(igfsCfg.getDataCacheName()));
             }
         }
 
-        if (IgniteComponentType.HADOOP.inClassPath())
+        if (IgniteComponentType.HADOOP.inClassPath()) {
             sysCaches.add(CU.SYS_CACHE_HADOOP_MR);
+            sysCacheIds.add(CU.cacheId(CU.SYS_CACHE_HADOOP_MR));
+        }
 
         sysCaches.add(CU.MARSH_CACHE_NAME);
         sysCaches.add(CU.UTILITY_CACHE_NAME);
         sysCaches.add(CU.ATOMICS_CACHE_NAME);
 
+        sysCacheIds.add(CU.cacheId(CU.MARSH_CACHE_NAME));
+        sysCacheIds.add(CU.cacheId(CU.UTILITY_CACHE_NAME));
+        sysCacheIds.add(CU.cacheId(CU.ATOMICS_CACHE_NAME));
+
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
         sharedCtx = createSharedContext(ctx);
@@ -2238,6 +2251,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @return Collection of all system cache IDs.
+     */
+    public Collection<Integer> systemCacheIds() {
+        return sysCacheIds;
+    }
+
+    /**
      * @param name Cache name.
      * @param <K> type of keys.
      * @param <V> type of values.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be9a963..495d56c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1081,7 +1081,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                     // Do not check topology version for CLOCK versioning since
                     // partition exchange will wait for near update future.
-                    if 
(topology().topologyVersion().equals(req.topologyVersion()) ||
+                    // Also do not check topology version if topology was 
locked on near node by
+                    // external transaction or explicit lock.
+                    if 
(topology().topologyVersion().equals(req.topologyVersion()) || 
req.topologyLocked() ||
                         ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ac4ae2c2..2131155 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -139,6 +140,9 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
     /** Map time. */
     private volatile long mapTime;
 
+    /** Topology locked flag. Set if atomic update is performed inside a TX or 
explicit lock. */
+    private boolean topLocked;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -295,7 +299,23 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      * @param waitTopFut Whether to wait for topology future.
      */
     public void map(boolean waitTopFut) {
-        mapOnTopology(keys, false, null, waitTopFut);
+        AffinityTopologyVersion topVer = null;
+
+        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+
+        if (tx != null && tx.topologyVersionSnapshot() != null)
+            topVer = tx.topologyVersionSnapshot();
+
+        if (topVer == null)
+            topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        if (topVer == null)
+            mapOnTopology(keys, false, null, waitTopFut);
+        else {
+            topLocked = true;
+
+            map0(topVer, keys, false, null);
+        }
     }
 
     /** {@inheritDoc} */
@@ -427,13 +447,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
-            if (fut.isDone()) {
+            if (fut.isDone())
                 topVer = fut.topologyVersion();
-
-                if (futVer == null)
-                    // Assign future version in topology read lock before 
first exception may be thrown.
-                    futVer = cctx.versions().next(topVer);
-            }
             else {
                 if (waitTopFut) {
                     fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@@ -447,11 +462,6 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
                 return;
             }
-
-            mapTime = U.currentTimeMillis();
-
-            if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || 
syncMode != FULL_ASYNC))
-                cctx.mvcc().addAtomicFuture(version(), this);
         }
         finally {
             cache.topology().readUnlock();
@@ -495,6 +505,16 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             return;
         }
 
+        if (futVer == null) {
+            // Assign future version in topology read lock before first 
exception may be thrown.
+            futVer = cctx.versions().next(topVer);
+
+            mapTime = U.currentTimeMillis();
+        }
+
+        if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || 
syncMode != FULL_ASYNC))
+            cctx.mvcc().addAtomicFuture(version(), this);
+
         CacheConfiguration ccfg = cctx.config();
 
         // Assign version on near node in CLOCK ordering mode even if fastMap 
is false.
@@ -578,6 +598,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 fastMap,
                 updVer,
                 topVer,
+                topLocked,
                 syncMode,
                 op,
                 retval,
@@ -700,6 +721,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                             fastMap,
                             updVer,
                             topVer,
+                            topLocked,
                             syncMode,
                             op,
                             retval,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 3f68a46..05bcf73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -64,6 +64,9 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** Topology locked flag. Set if atomic update is performed inside TX or 
explicit lock. */
+    private boolean topLocked;
+
     /** Write synchronization mode. */
     private CacheWriteSynchronizationMode syncMode;
 
@@ -158,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
         boolean fastMap,
         @Nullable GridCacheVersion updateVer,
         @NotNull AffinityTopologyVersion topVer,
+        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
         boolean retval,
@@ -174,6 +178,7 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
         this.updateVer = updateVer;
 
         this.topVer = topVer;
+        this.topLocked = topLocked;
         this.syncMode = syncMode;
         this.op = op;
         this.retval = retval;
@@ -248,6 +253,13 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
     }
 
     /**
+     * @return Topology locked flag.
+     */
+    public boolean topologyLocked() {
+        return topLocked;
+    }
+
+    /**
      * @return Cache write synchronization mode.
      */
     public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -645,18 +657,24 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 21:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
                 if (!writer.writeCollection("vals", vals, 
MessageCollectionItemType.MSG))
                     return false;
 
@@ -815,7 +833,7 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 19:
-                topVer = reader.readMessage("topVer");
+                topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
                     return false;
@@ -823,7 +841,7 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 20:
-                updateVer = reader.readMessage("updateVer");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -831,6 +849,14 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 21:
+                updateVer = reader.readMessage("updateVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
                 vals = reader.readCollection("vals", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 0de4653..679f507 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.affinity.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 3087dff..585a64b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -284,7 +284,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends 
GridCompoundIdentity
                     false,
                     false);
 
-                cand.topologyVersion(new 
AffinityTopologyVersion(topVer.get().topologyVersion()));
+                cand.topologyVersion(topVer.get());
             }
         }
         else {
@@ -303,7 +303,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends 
GridCompoundIdentity
                     false,
                     false);
 
-                cand.topologyVersion(new 
AffinityTopologyVersion(topVer.get().topologyVersion()));
+                cand.topologyVersion(topVer.get());
             }
             else
                 cand = cand.reenter();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a8ff280..54c79e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -644,6 +644,27 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Any transaction associated with the current thread.
+     */
+    public IgniteInternalTx anyActiveThreadTx() {
+        long threadId = Thread.currentThread().getId();
+
+        IgniteInternalTx tx = threadMap.get(threadId);
+
+        if (tx != null && tx.topologyVersionSnapshot() != null)
+            return tx;
+
+        for (int cacheId : cctx.cache().systemCacheIds()) {
+            tx = sysThreadMap.get(new TxThreadKey(threadId, cacheId));
+
+            if (tx != null && tx.topologyVersionSnapshot() != null)
+                return tx;
+        }
+
+        return null;
+    }
+
+    /**
      * @return Local transaction.
      */
     @Nullable public IgniteInternalTx localTxx() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java
new file mode 100644
index 0000000..b8682b7
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicOpWithinTxSelfTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ * Test checks that atomic operations are not blocked when invoked inside 
transaction.
+ */
+@SuppressWarnings({"unchecked", "TypeMayBeWeakened"})
+public class IgniteCacheAtomicOpWithinTxSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String TRANSACTIONAL_CACHE = "tx";
+
+    /** */
+    private static final String ATOMIC_CACHE = "atomic";
+
+    /** */
+    private static final int NODE_COUNT = 3;
+
+    /** */
+    private CacheWriteSynchronizationMode syncMode;
+
+    /** */
+    private CacheAtomicWriteOrderMode orderMode;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODE_COUNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    private void initCaches() {
+        CacheConfiguration txCfg = new CacheConfiguration(TRANSACTIONAL_CACHE);
+
+        txCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        txCfg.setBackups(1);
+
+        grid(0).createCache(txCfg);
+
+        CacheConfiguration atomicCfg = new CacheConfiguration(ATOMIC_CACHE);
+
+        atomicCfg.setBackups(1);
+        atomicCfg.setWriteSynchronizationMode(syncMode);
+        atomicCfg.setAtomicWriteOrderMode(orderMode);
+
+        grid(0).createCache(atomicCfg);
+    }
+
+    /**
+     *
+     */
+    private void destroyCaches() {
+        grid(0).destroyCache(TRANSACTIONAL_CACHE);
+        grid(0).destroyCache(ATOMIC_CACHE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxPrimarySyncPrimaryNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpTx(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxPrimarySyncClockNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpTx(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxFullSyncPrimaryNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpTx(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxFullSyncClockNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpTx(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxPrimarySyncPrimary() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpTx(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxPrimarySyncClock() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpTx(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxFullSyncPrimary() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpTx(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpTxFullSyncClock() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpTx(false);
+    }
+
+    
+    
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockPrimarySyncPrimaryNodeJoined() throws 
Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockPrimarySyncClockNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockFullSyncPrimaryNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockFullSyncClockNodeJoined() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockPrimarySyncPrimary() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpLock(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockPrimarySyncClock() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.PRIMARY_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpLock(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockFullSyncPrimary() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.PRIMARY;
+
+        checkAtomicOpLock(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpLockFullSyncClock() throws Exception {
+        syncMode = CacheWriteSynchronizationMode.FULL_SYNC;
+        orderMode = CacheAtomicWriteOrderMode.CLOCK;
+
+        checkAtomicOpLock(false);
+    }
+
+    /**
+     * @param startNode {@code True} if should start node inside tx.
+     * @throws Exception If failed.
+     */
+    private void checkAtomicOpTx(boolean startNode) throws Exception {
+        initCaches();
+
+        try {
+            IgniteCache<Object, Object> txCache = 
grid(0).cache(TRANSACTIONAL_CACHE);
+            IgniteCache<Object, Object> atomicCache = 
grid(0).cache(ATOMIC_CACHE);
+
+            for (int i = 0; i < 10; i++)
+                atomicCache.put("key" + i, -1);
+
+            try {
+                IgniteInternalFuture<Object> fut = null;
+
+                try (Transaction tx = 
grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    for (int i = 0; i < 10; i++)
+                        txCache.put("key" + i, i);
+
+                    if (startNode) {
+                        fut = GridTestUtils.runAsync(new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return startGrid(NODE_COUNT);
+                            }
+                        });
+
+                        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == 
NODE_COUNT + 1;
+                            }
+                        }, getTestTimeout());
+                    }
+
+                    info("Starting atomic puts.");
+
+                    for (int i = 0; i < 10; i++)
+                        atomicCache.put("key" + i, i);
+
+                    info("Atomic puts done.");
+
+                    tx.commit();
+                }
+
+                if (fut != null)
+                    fut.get();
+
+                for (int g = 0; g < NODE_COUNT + (startNode ? 1 : 0); g++) {
+                    IgniteCache<Object, Object> cache = 
grid(g).cache(ATOMIC_CACHE);
+
+                    for (int i = 0; i < 10; i++)
+                        assertEquals(i, cache.get("key" + i));
+                }
+            }
+            finally {
+                if (startNode)
+                    stopGrid(NODE_COUNT);
+            }
+        }
+        finally {
+            destroyCaches();
+        }
+    }
+
+    /**
+     * @param startNode {@code True} if should start node inside tx.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("TooBroadScope")
+    private void checkAtomicOpLock(boolean startNode) throws Exception {
+        initCaches();
+
+        try {
+            IgniteCache<Object, Object> txCache = 
grid(0).cache(TRANSACTIONAL_CACHE);
+            IgniteCache<Object, Object> atomicCache = 
grid(0).cache(ATOMIC_CACHE);
+
+            for (int i = 0; i < 10; i++)
+                atomicCache.put("key" + i, -1);
+
+            try {
+                Lock lock = txCache.lock("key0");
+
+                IgniteInternalFuture<Object> fut = null;
+
+                lock.lock();
+
+                try {
+                    for (int i = 0; i < 10; i++)
+                        txCache.put("key" + i, i);
+
+                    if (startNode) {
+                        fut = GridTestUtils.runAsync(new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return startGrid(NODE_COUNT);
+                            }
+                        });
+
+                        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == 
NODE_COUNT + 1;
+                            }
+                        }, getTestTimeout());
+                    }
+
+                    info("Starting atomic puts.");
+
+                    for (int i = 0; i < 10; i++)
+                        atomicCache.put("key" + i, i);
+
+                    info("Atomic puts done.");
+                }
+                finally {
+                    lock.unlock();
+                }
+
+                if (fut != null)
+                    fut.get();
+
+                for (int g = 0; g < NODE_COUNT + (startNode ? 1 : 0); g++) {
+                    IgniteCache<Object, Object> cache = 
grid(g).cache(ATOMIC_CACHE);
+
+                    for (int i = 0; i < 10; i++)
+                        assertEquals(i, cache.get("key" + i));
+                }
+            }
+            finally {
+                if (startNode)
+                    stopGrid(NODE_COUNT);
+            }
+        }
+        finally {
+            destroyCaches();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOpSystemCacheTxNodeJoined() throws Exception {
+        initCaches();
+
+        try {
+            GridCacheAdapter txCache = 
(GridCacheAdapter)grid(0).utilityCache();
+            IgniteCache<Object, Object> atomicCache = 
grid(0).cache(ATOMIC_CACHE);
+
+            for (int i = 0; i < 10; i++)
+                atomicCache.put("key" + i, -1);
+
+            try {
+                IgniteInternalFuture<Object> fut;
+
+                // Intentionally start system tx.
+                try (Transaction tx = txCache.txStart(PESSIMISTIC, 
REPEATABLE_READ)) {
+                    for (int i = 0; i < 10; i++)
+                        txCache.put("key" + i, i);
+
+                    fut = GridTestUtils.runAsync(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            return startGrid(NODE_COUNT);
+                        }
+                    });
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return grid(0).cluster().nodes().size() == 
NODE_COUNT + 1;
+                        }
+                    }, getTestTimeout());
+
+                    info("Starting atomic puts.");
+
+                    for (int i = 0; i < 10; i++)
+                        atomicCache.put("key" + i, i);
+
+                    info("Atomic puts done.");
+
+                    tx.commit();
+                }
+
+                if (fut != null)
+                    fut.get();
+
+                for (int g = 0; g < NODE_COUNT + 1; g++) {
+                    IgniteCache<Object, Object> cache = 
grid(g).cache(ATOMIC_CACHE);
+
+                    for (int i = 0; i < 10; i++)
+                        assertEquals(i, cache.get("key" + i));
+                }
+            }
+            finally {
+                stopGrid(NODE_COUNT);
+            }
+        }
+        finally {
+            destroyCaches();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/108a3e6a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index d5ae2e6..b5b3c29 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -427,6 +427,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheTxStoreValueTest.class);
         suite.addTestSuite(IgniteCacheTxNearEnabledStoreValueTest.class);
 
+        suite.addTestSuite(IgniteCacheAtomicOpWithinTxSelfTest.class);
+
         return suite;
     }
 }

Reply via email to