Repository: ignite
Updated Branches:
  refs/heads/ignite-1607-read e05a7f8c0 -> fdd6f1c9e


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdd6f1c9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdd6f1c9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdd6f1c9

Branch: refs/heads/ignite-1607-read
Commit: fdd6f1c9e37a0e17665dfc301e9cb0ba4a13b005
Parents: e05a7f8
Author: sboikov <[email protected]>
Authored: Thu Oct 8 12:09:17 2015 +0300
Committer: sboikov <[email protected]>
Committed: Thu Oct 8 14:41:45 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/near/GridNearTxLocal.java |  22 +-
 .../transactions/IgniteTxLocalAdapter.java      | 318 +++++++++----------
 .../cache/transactions/IgniteTxLocalEx.java     |   4 +-
 .../IgniteTxMultiThreadedAbstractTest.java      |  97 +++---
 ...CachePartitionedTxMultiThreadedSelfTest.java |   3 +
 5 files changed, 216 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c43cab5..9207bd0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -343,7 +343,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> loadMissing(
+    @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         boolean readThrough,
         boolean async,
@@ -361,8 +361,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 accessPolicy(cacheCtx, keys),
                 skipVals,
                 needVer,
-                c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, 
Boolean>() {
-                @Override public Boolean 
apply(IgniteInternalFuture<Map<Object, Object>> f) {
+                c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, 
Void>() {
+                @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
                     try {
                         Map<Object, Object> map = f.get();
 
@@ -373,7 +373,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                             }
                         }
 
-                        return true;
+                        return null;
                     }
                     catch (Exception e) {
                         setRollbackOnly();
@@ -398,8 +398,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 /*can remap*/true,
                 needVer,
                 c
-            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, 
Boolean>() {
-                @Override public Boolean 
apply(IgniteInternalFuture<Map<Object, Object>> f) {
+            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
+                @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
                     try {
                         Map<Object, Object> map = f.get();
 
@@ -410,7 +410,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                             }
                         }
 
-                        return true;
+                        return null;
                     }
                     catch (Exception e) {
                         setRollbackOnly();
@@ -1223,12 +1223,8 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
         return plc;
     }
 
-    /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys.
-     * @return Expiry policy.
-     */
-    private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, 
Collection<KeyCacheObject> keys) {
+    /** {@inheritDoc} */
+    @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext 
cacheCtx, Collection<KeyCacheObject> keys) {
         if (accessMap != null) {
             for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
                 if (e.getKey().cacheId() == cacheCtx.cacheId() && 
keys.contains(e.getKey().key()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 76df164..99b4c45 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -87,10 +87,8 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -418,7 +416,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> loadMissing(
+    @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         final boolean readThrough,
         boolean async,
@@ -428,41 +426,106 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         boolean needVer,
         final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
     ) {
-        // TODO IGNITE-1607.
-        return new GridFinishedFuture<>();
-
-//        if (!async) {
-//            try {
-//                if (!readThrough || !cacheCtx.readThrough()) {
-//                    for (KeyCacheObject key : keys)
-//                        c.apply(key, null, null);
-//
-//                    return new GridFinishedFuture<>(false);
-//                }
-//
-//                return new GridFinishedFuture<>(
-//                    cacheCtx.store().loadAll(this, keys, c));
-//            }
-//            catch (IgniteCheckedException e) {
-//                return new GridFinishedFuture<>(e);
-//            }
-//        }
-//        else {
-//            return cctx.kernalContext().closure().callLocalSafe(
-//                new GPC<Boolean>() {
-//                    @Override public Boolean call() throws Exception {
-//                        if (!readThrough || !cacheCtx.readThrough()) {
-//                            for (KeyCacheObject key : keys)
-//                                c.apply(key, null, null);
-//
-//                            return false;
-//                        }
-//
-//                        return 
cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c);
-//                    }
-//                },
-//                true);
-//        }
+        assert cacheCtx.isLocal() : cacheCtx.name();
+
+        if (!readThrough || !cacheCtx.readThrough()) {
+            for (KeyCacheObject key : keys)
+                c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+
+            return new GridFinishedFuture<>();
+        }
+
+        try {
+            IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys);
+
+            Map<KeyCacheObject, GridCacheVersion> misses = null;
+
+            for (KeyCacheObject key : keys) {
+                while (true) {
+                    GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+
+                    try {
+                        T2<CacheObject, GridCacheVersion> res = 
entry.innerGetVersioned(this,
+                            /*readSwap*/true,
+                            /*unmarshal*/true,
+                            /*update-metrics*/!skipVals,
+                            /*event*/!skipVals,
+                            CU.subjectId(this, cctx),
+                            null,
+                            resolveTaskName(),
+                            expiryPlc);
+
+                        if (res == null) {
+                            if (misses == null)
+                                misses = new LinkedHashMap<>();
+
+                            misses.put(key, entry.version());
+                        }
+                        else
+                            c.apply(key, res.get1(), res.get2());
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry, will retry: " + key);
+                    }
+                }
+            }
+
+            if (misses != null) {
+                final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
+
+                cacheCtx.store().loadAll(this, misses.keySet(), new 
CI2<KeyCacheObject, Object>() {
+                    private GridCacheVersion nextVer;
+
+                    @Override public void apply(KeyCacheObject key, Object 
val) {
+                        GridCacheVersion ver = misses0.remove(key);
+
+                        assert ver != null : key;
+
+                        c.apply(key, val, ver);
+
+                        if (nextVer == null)
+                            nextVer = cacheCtx.versions().next();
+
+                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+                        while (true) {
+                            GridCacheEntryEx entry = 
cacheCtx.cache().entryEx(key);
+
+                            try {
+                                boolean set = entry.versionedValue(cacheVal, 
ver, nextVer);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Set value loaded from store 
into entry [set=" + set +
+                                        ", curVer=" + ver + ", newVer=" + 
nextVer + ", " +
+                                        "entry=" + entry + ']');
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Got removed entry, (will 
retry): " + entry);
+                            }
+                            catch (IgniteCheckedException e) {
+                                // Wrap errors (will be unwrapped).
+                                throw new GridClosureException(e);
+                            }
+                        }
+
+                    }
+                });
+
+                for (KeyCacheObject key : misses0.keySet())
+                    c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
+            }
+
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
     }
 
     /**
@@ -1517,6 +1580,15 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys.
+     * @return Expiry policy.
+     */
+    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, 
Collection<KeyCacheObject> keys) {
+        return null;
+    }
+
+    /**
      * Adds skipped key.
      *
      * @param skipped Skipped set (possibly {@code null}).
@@ -1566,46 +1638,14 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         final boolean needReadVer = optimistic() && serializable();
 
         return new GridEmbeddedFuture<>(
-            new C2<Boolean, Exception, Map<K, V>>() {
-                @Override public Map<K, V> apply(Boolean b, Exception e) {
+            new C2<Void, Exception, Map<K, V>>() {
+                @Override public Map<K, V> apply(Void v, Exception e) {
                     if (e != null) {
                         setRollbackOnly();
 
                         throw new GridClosureException(e);
                     }
 
-                    if (!b && !readCommitted()) {
-                        // There is no store - we must mark the entries.
-                        for (KeyCacheObject key : missedMap.keySet()) {
-                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
-                            if (txEntry != null)
-                                txEntry.markValid();
-                        }
-                    }
-
-                    if (readCommitted()) {
-                        assert loaded != null;
-
-                        Collection<KeyCacheObject> notFound = new 
HashSet<>(missedMap.keySet());
-
-                        notFound.removeAll(loaded);
-
-                        // In read-committed mode touch entries that have just 
been read.
-                        for (KeyCacheObject key : notFound) {
-                            if (loaded.contains(key))
-                                continue;
-
-                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
-                            GridCacheEntryEx entry = txEntry == null ? 
cacheCtx.cache().peekEx(key) :
-                                txEntry.cached();
-
-                            if (entry != null)
-                                cacheCtx.evicts().touch(entry, 
topologyVersion());
-                        }
-                    }
-
                     return map;
                 }
             },
@@ -1618,9 +1658,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 skipVals,
                 needReadVer,
                 new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() 
{
-                    /** */
-                    private GridCacheVersion nextVer;
-
                     @Override public void apply(KeyCacheObject key, Object 
val, GridCacheVersion loadVer) {
                         if (isRollbackOnly()) {
                             if (log.isDebugEnabled())
@@ -1630,15 +1667,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                             return;
                         }
 
-                        GridCacheVersion ver = missedMap.get(key);
-
-                        if (ver == null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Value from storage was never asked 
for [key=" + key + ", val=" + val + ']');
-
-                            return;
-                        }
-
                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                         CacheObject visibleVal = cacheVal;
@@ -1655,99 +1683,47 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                                 visibleVal = 
txEntry.applyEntryProcessors(visibleVal);
                         }
 
-                        // In pessimistic mode we hold the lock, so filter 
validation
-                        // should always be valid.
-                        if (pessimistic())
-                            ver = null;
-
-                        // Initialize next version.
-                        if (nextVer == null)
-                            nextVer = cctx.versions().next(topologyVersion());
-
-                        while (true) {
-                            assert txEntry != null || readCommitted() || 
skipVals;
-
-                            GridCacheEntryEx e = txEntry == null ? 
entryEx(cacheCtx, txKey) : txEntry.cached();
-
-                            try {
-                                // Must initialize to true since even if 
filter didn't pass,
-                                // we still record the transaction value.
-                                boolean set;
-
-                                try {
-                                    set = e.versionedValue(cacheVal, ver, 
nextVer);
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry in 
transaction getAll method " +
-                                            "(will try again): " + e);
-
-                                    if (pessimistic() && !readCommitted() && 
!isRollbackOnly()) {
-                                        U.error(log, "Inconsistent transaction 
state (entry got removed while " +
-                                            "holding lock) [entry=" + e + ", 
tx=" + IgniteTxLocalAdapter.this + "]");
-
-                                        setRollbackOnly();
-
-                                        return;
-                                    }
-
-                                    if (txEntry != null)
-                                        txEntry.cached(entryEx(cacheCtx, 
txKey));
-
-                                    continue; // While loop.
-                                }
-
-                                // In pessimistic mode, we should always be 
able to set.
-                                assert set || !pessimistic();
-
-                                if (readCommitted() || skipVals) {
-                                    cacheCtx.evicts().touch(e, 
topologyVersion());
-
-                                    if (visibleVal != null) {
-                                        cacheCtx.addResult(map,
-                                            key,
-                                            visibleVal,
-                                            skipVals,
-                                            keepCacheObjects,
-                                            deserializePortable,
-                                            false);
-                                    }
-                                }
-                                else {
-                                    assert txEntry != null;
-
-                                    txEntry.setAndMarkValid(cacheVal);
+                        assert txEntry != null || readCommitted() || skipVals;
 
-                                    if (needReadVer) {
-                                        assert loadVer != null;
+                        GridCacheEntryEx e = txEntry == null ? 
entryEx(cacheCtx, txKey) : txEntry.cached();
 
-                                        
txEntry.serializableReadVersion(loadVer);
-                                    }
+                        if (readCommitted() || skipVals) {
+                            cacheCtx.evicts().touch(e, topologyVersion());
 
-                                    if (visibleVal != null) {
-                                        cacheCtx.addResult(map,
-                                            key,
-                                            visibleVal,
-                                            skipVals,
-                                            keepCacheObjects,
-                                            deserializePortable,
-                                            false);
-                                    }
-                                }
+                            if (visibleVal != null) {
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
+                            }
+                        }
+                        else {
+                            assert txEntry != null;
 
-                                if (readCommitted())
-                                    loaded.add(key);
+                            txEntry.setAndMarkValid(cacheVal);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Set value loaded from store 
into entry from transaction [set=" + set +
-                                        ", matchVer=" + ver + ", newVer=" + 
nextVer + ", entry=" + e + ']');
+                            if (needReadVer) {
+                                assert loadVer != null;
 
-                                break; // While loop.
+                                txEntry.serializableReadVersion(loadVer);
                             }
-                            catch (IgniteCheckedException ex) {
-                                throw new IgniteException("Failed to put value 
for cache entry: " + e, ex);
+
+                            if (visibleVal != null) {
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                         }
+
+                        if (readCommitted())
+                            loaded.add(key);
                     }
                 })
         );
@@ -2455,7 +2431,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         }
 
         if (missedForLoad != null) {
-            IgniteInternalFuture<Boolean> fut = loadMissing(
+            IgniteInternalFuture<Void> fut = loadMissing(
                 cacheCtx,
                 /*read through*/cacheCtx.config().isLoadPreviousValue() && 
!skipStore,
                 /*async*/true,
@@ -2508,8 +2484,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 });
 
             return new GridEmbeddedFuture<>(
-                new C2<Boolean, Exception, Set<KeyCacheObject>>() {
-                    @Override public Set<KeyCacheObject> apply(Boolean b, 
Exception e) {
+                new C2<Void, Exception, Set<KeyCacheObject>>() {
+                    @Override public Set<KeyCacheObject> apply(Void b, 
Exception e) {
                         if (e != null)
                             throw new GridClosureException(e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 8f5f37b..55d0dbb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -165,9 +165,11 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param c Closure.
      * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
+     * @param needVer If {@code true} version is required for loaded values.
+     * @param c Closure to be applied for loaded values.
      * @return Future with {@code True} value if loading took place.
      */
-    public IgniteInternalFuture<Boolean> loadMissing(
+    public IgniteInternalFuture<Void> loadMissing(
         GridCacheContext cacheCtx,
         boolean readThrough,
         boolean async,

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
index 9e14d30..7a1a0b9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java
@@ -219,78 +219,89 @@ public abstract class IgniteTxMultiThreadedAbstractTest 
extends IgniteTxAbstract
      * @throws Exception If failed.
      */
     public void testOptimisticSerializableConsistency() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-582";);
-
         final IgniteCache<Integer, Long> cache = grid(0).cache(null);
 
-        final int THREADS = 2;
+        final int THREADS = 3;
 
         final int ITERATIONS = 100;
 
-        final int key = 0;
+        for (int key0 = 0; key0 < 20; key0++) {
+            final int key = key0;
 
-        cache.put(key, 0L);
+            cache.put(key, 0L);
 
-        List<IgniteInternalFuture<Collection<Long>>> futs = new 
ArrayList<>(THREADS);
+            List<IgniteInternalFuture<Collection<Long>>> futs = new 
ArrayList<>(THREADS);
 
-        for (int i = 0; i < THREADS; i++) {
-            futs.add(GridTestUtils.runAsync(new Callable<Collection<Long>>() {
-                @Override public Collection<Long> call() throws Exception {
-                    Collection<Long> res = new ArrayList<>();
+            for (int i = 0; i < THREADS; i++) {
+                futs.add(GridTestUtils.runAsync(new 
Callable<Collection<Long>>() {
+                    @Override public Collection<Long> call() throws Exception {
+                        Collection<Long> res = new ArrayList<>();
 
-                    for (int i = 0; i < ITERATIONS; i++) {
-                        while (true) {
-                            try (Transaction tx = 
grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
-                                long val = cache.get(key);
+                        for (int i = 0; i < ITERATIONS; i++) {
+                            while (true) {
+                                try (Transaction tx = 
grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                                    long val = cache.get(key);
 
-                                cache.put(key, val + 1);
+                                    cache.put(key, val + 1);
 
-                                tx.commit();
+                                    tx.commit();
 
-                                assertTrue(res.add(val + 1));
+                                    assertTrue(res.add(val + 1));
 
-                                break;
-                            }
-                            catch(TransactionOptimisticException e) {
-                                log.info("Got error, will retry: " + e);
+                                    break;
+                                }
+                                catch(TransactionOptimisticException e) {
+                                    log.info("Got error, will retry: " + e);
+                                }
                             }
                         }
+
+                        return res;
                     }
+                }));
+            }
 
-                    return res;
-                }
-            }));
-        }
+            long total = 0;
 
-        List<Collection<Long>> cols = new ArrayList<>(THREADS);
+            List<Collection<Long>> cols = new ArrayList<>(THREADS);
 
-        for (IgniteInternalFuture<Collection<Long>> fut : futs) {
-            Collection<Long> col = fut.get();
+            for (IgniteInternalFuture<Collection<Long>> fut : futs) {
+                Collection<Long> col = fut.get();
 
-            assertEquals(ITERATIONS, col.size());
+                assertEquals(ITERATIONS, col.size());
 
-            cols.add(col);
-        }
+                total += col.size();
 
-        Set<Long> duplicates = new HashSet<>();
+                cols.add(col);
+            }
 
-        for (Collection<Long> col1 : cols) {
-            for (Long val1 : col1) {
-                for (Collection<Long> col2 : cols) {
-                    if (col1 == col2)
-                        continue;
+            log.info("Cache value: " + cache.get(key));
 
-                    for (Long val2 : col2) {
-                        if (val1.equals(val2)) {
-                            duplicates.add(val2);
+            Set<Long> duplicates = new HashSet<>();
 
-                            break;
+            for (Collection<Long> col1 : cols) {
+                for (Long val1 : col1) {
+                    for (Collection<Long> col2 : cols) {
+                        if (col1 == col2)
+                            continue;
+
+                        for (Long val2 : col2) {
+                            if (val1.equals(val2)) {
+                                duplicates.add(val2);
+
+                                break;
+                            }
                         }
                     }
                 }
             }
-        }
 
-        assertTrue("Found duplicated values: " + duplicates, 
duplicates.isEmpty());
+            assertTrue("Found duplicated values: " + duplicates, 
duplicates.isEmpty());
+
+            assertEquals((long)THREADS * ITERATIONS, total);
+
+            for (int i = 0; i < gridCount(); i++)
+                assertEquals(total, (Object)cache.get(key));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
index 6ed25eb..346bd34 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java
@@ -53,6 +53,9 @@ public class GridCachePartitionedTxMultiThreadedSelfTest 
extends IgniteTxMultiTh
 
         CacheConfiguration cc = defaultCacheConfiguration();
 
+        // TODO IGNITE-1607 add test with near cache.
+        cc.setNearConfiguration(null);
+
         cc.setCacheMode(PARTITIONED);
         cc.setBackups(1);
 

Reply via email to