ignite-1607 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f880227a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f880227a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f880227a Branch: refs/heads/ignite-1607 Commit: f880227a97beccb7e6d7e456ce61f189bf800299 Parents: 177ae71 Author: sboikov <[email protected]> Authored: Fri Oct 16 10:39:08 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 16 12:44:06 2015 +0300 ---------------------------------------------------------------------- .../distributed/GridDistributedCacheEntry.java | 6 +- .../dht/CacheDistributedGetFutureAdapter.java | 70 ++------- .../distributed/dht/GridDhtCacheEntry.java | 7 - .../distributed/dht/GridDhtTxPrepareFuture.java | 46 ++++-- .../dht/GridPartitionedGetFuture.java | 54 +++---- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 39 ++--- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearGetFuture.java | 32 ++-- ...arOptimisticSerializableTxPrepareFuture.java | 10 +- .../near/GridNearOptimisticTxPrepareFuture.java | 33 +--- .../near/GridNearTransactionalCache.java | 7 +- .../cache/distributed/near/GridNearTxLocal.java | 68 ++++++--- .../transactions/IgniteTxLocalAdapter.java | 35 ++--- .../cache/transactions/IgniteTxLocalEx.java | 2 - .../CacheSerializableTransactionsTest.java | 149 +++++++++++++++++++ 16 files changed, 342 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index d564768..89045ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -754,8 +754,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { tx.xidVersion(), tx.topologyVersion(), timeout, - false, - true, + /*reenter*/false, + /*tx*/true, tx.implicitSingle()) != null; try { @@ -765,7 +765,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { tx.threadId(), tx.xidVersion(), tx.timeout(), - true, + /*tx*/true, tx.implicitSingle(), tx.ownedVersion(txKey()) ); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 4989a50..fc04126 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -27,13 +27,11 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; -import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; import static org.apache.ignite.IgniteSystemProperties.getInteger; @@ -95,7 +93,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun protected final boolean needVer; /** */ - protected final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC; + protected final boolean keepCacheObjects; /** * @param cctx Context. @@ -110,8 +108,8 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Flag indicating whether future can be remapped on a newer topology version. - * @param resC Closure applied on 'get' result. - * @param needVer If {@code true} need provide entry version to result closure. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. */ protected CacheDistributedGetFutureAdapter( GridCacheContext<K, V> cctx, @@ -126,13 +124,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun boolean skipVals, boolean canRemap, boolean needVer, - @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC + boolean keepCacheObjects ) { - super(cctx.kernalContext(), - resC != null ? new ResultClosureReducer<K, V>(keys.size()) : CU.<K, V>mapsReducer(keys.size())); + super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); assert !F.isEmpty(keys); - assert !needVer || resC != null; this.cctx = cctx; this.keys = keys; @@ -146,63 +142,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun this.skipVals = skipVals; this.canRemap = canRemap; this.needVer = needVer; - this.resC = resC; + this.keepCacheObjects = keepCacheObjects; futId = IgniteUuid.randomUuid(); } /** + * @param map Result map. * @param key Key. * @param val Value. * @param ver Version. */ @SuppressWarnings("unchecked") - protected final void resultClosureValue(KeyCacheObject key, Object val, GridCacheVersion ver) { - assert resC != null; - assert val != null; - assert !needVer || ver != null; + protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) { + assert needVer; + assert skipVals || val != null; + assert ver != null; - ResultClosureReducer<K, V> rdc = (ResultClosureReducer)reducer(); - - assert rdc != null; - - rdc.collect(key); - - resC.apply(key, skipVals ? true : val, ver); - } - - /** - * - */ - private static class ResultClosureReducer<K, V> implements IgniteReducer<Map<K, V>, Map<K, V>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final ConcurrentHashMap8<KeyCacheObject, Boolean> map; - - /** - * @param keys Number of keys. - */ - public ResultClosureReducer(int keys) { - this.map = new ConcurrentHashMap8<>(keys); - } - - /** - * @param key Key. - */ - void collect(KeyCacheObject key) { - map.put(key, Boolean.TRUE); - } - - /** {@inheritDoc} */ - @Override public boolean collect(@Nullable Map<K, V> map) { - return true; - } - - /** {@inheritDoc} */ - @Override public Map<K, V> reduce() { - return (Map)map; - } + map.put(key, new T2<>(skipVals ? true : val, ver)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index c483e01..5d125ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -204,13 +204,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { checkObsolete(); - if (serReadVer != null) { - unswap(false); - - if (!checkSerializableReadVersion(serReadVer)) - return null; - } - GridCacheMvcc mvcc = mvccExtras(); if (mvcc == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index f25ee33..1e1a6b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -818,12 +818,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.writes = writes; this.txNodes = txNodes; - if (!F.isEmpty(writes)) { + boolean ser = tx.optimistic() && tx.serializable(); + + if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) { Map<Integer, Collection<KeyCacheObject>> forceKeys = null; for (IgniteTxEntry entry : writes) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + if (ser) { + for (IgniteTxEntry entry : reads) + forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + } + forceKeysFut = forceRebalanceKeys(forceKeys); } @@ -852,7 +859,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteTxEntry e, Map<Integer, Collection<KeyCacheObject>> map ) { - if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) { + if (retVal || + !F.isEmpty(e.entryProcessors()) || + !F.isEmpty(e.filters()) || + e.serializableReadVersion() != null) { if (map == null) map = new HashMap<>(); @@ -914,18 +924,32 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param entries Entries. * @return Not null exception if version check failed. */ - @Nullable private IgniteTxOptimisticCheckedException checkReadConflict(Collection<IgniteTxEntry> entries) { - for (IgniteTxEntry entry : entries) { - GridCacheVersion serReadVer = entry.serializableReadVersion(); + @Nullable private IgniteCheckedException checkReadConflict(Collection<IgniteTxEntry> entries) { + try { + for (IgniteTxEntry entry : entries) { + GridCacheVersion serReadVer = entry.serializableReadVersion(); - if (serReadVer != null && !entry.cached().checkSerializableReadVersion(serReadVer)) { - GridCacheContext cctx = entry.context(); + if (serReadVer != null) { + entry.cached().unswap(); - return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + - "read conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + - ", cache=" + cctx.name() + ']'); + if (!entry.cached().checkSerializableReadVersion(serReadVer)) { + GridCacheContext cctx = entry.context(); + + return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + + "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + + ", cache=" + cctx.name() + ']'); + } + } } } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unswap entry: " + e, e); + + return e; + } + catch (GridCacheEntryRemovedException e) { + assert false : "Got removed exception on entry with dht local candidate: " + entries; + } return null; } @@ -936,7 +960,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private void prepare0() { try { if (tx.optimistic() && tx.serializable()) { - IgniteTxOptimisticCheckedException err0 = checkReadConflict(writes); + IgniteCheckedException err0 = checkReadConflict(writes); if (err0 == null) err0 = checkReadConflict(reads); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 18c6d69..efa2b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -48,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -96,8 +94,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Flag indicating whether future can be remapped on a newer topology version. - * @param needVer If {@code true} need provide entry version to result closure. - * @param resC Closure applied on 'get' result. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, @@ -113,7 +111,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda boolean skipVals, boolean canRemap, boolean needVer, - @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC + boolean keepCacheObjects ) { super(cctx, keys, @@ -127,7 +125,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda skipVals, canRemap, needVer, - resC); + keepCacheObjects); this.topVer = topVer; @@ -264,7 +262,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda final int keysSize = keys.size(); - Map<K, V> locVals = resC == null ? U.<K, V>newHashMap(keysSize) : null; + Map<K, V> locVals = U.newHashMap(keysSize); boolean hasRmtNodes = false; @@ -454,10 +452,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda colocated.removeIfObsolete(key); } else { - if (resC != null) - resultClosureValue(key, skipVals ? true : v, ver); + if (needVer) + versionedResult(locVals, key, v, ver); else - cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializePortable, + true); return false; } @@ -550,24 +554,24 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda int keysSize = infos.size(); if (keysSize != 0) { - if (resC != null) { - for (GridCacheEntryInfo info : infos) { - assert skipVals == (info.value() == null); + Map<K, V> map = new GridLeanMap<>(keysSize); - resultClosureValue(info.key(), skipVals ? true : info.value(), info.version()); - } - } - else { - Map<K, V> map = new GridLeanMap<>(keysSize); - - for (GridCacheEntryInfo info : infos) { - assert skipVals == (info.value() == null); + for (GridCacheEntryInfo info : infos) { + assert skipVals == (info.value() == null); - cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); - } - - return map; + if (needVer) + versionedResult(map, info.key(), info.value(), info.version()); + else + cctx.addResult(map, + info.key(), + info.value(), + skipVals, + keepCacheObjects, + deserializePortable, + false); } + + return map; } return Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d9840ec..8494410 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1040,7 +1040,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { skipVals, canRemap, false, - null); + false); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 241cc07..e8aca71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -290,7 +290,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte skipVals, canRemap, false, - null); + false); } /** @@ -319,7 +319,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean skipVals, boolean canRemap, boolean needVer, - @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c + boolean keepCacheObject ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -330,7 +330,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { Map<K, V> locVals = null; - Map<KeyCacheObject, T2<Object, GridCacheVersion>> locVals0 = null; boolean success = true; @@ -391,18 +390,19 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte success = false; } else { - if (c != null) { - if (locVals0 == null) - locVals0 = U.newHashMap(keys.size()); - - locVals0.put(key, new T2<>((Object)(skipVals ? true : v), ver)); - } - else { - if (locVals == null) - locVals = U.newHashMap(keys.size()); - - ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); - } + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + if (needVer) + locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver)); + else + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObject, + deserializePortable, + true); } } else @@ -436,13 +436,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (success) { sendTtlUpdateRequest(expiryPlc); - if (c != null) { - if (locVals0 != null) { - for (Map.Entry<KeyCacheObject, T2<Object, GridCacheVersion>> e : locVals0.entrySet()) - c.apply(e.getKey(), e.getValue().get1(), e.getValue().get2()); - } - } - return new GridFinishedFuture<>(locVals); } } @@ -465,7 +458,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte skipVals, canRemap, needVer, - c); + keepCacheObject); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index d1adf1d..aec751e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -290,7 +290,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda skipVal, canRemap, false, - null); + false); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 43cc92a..61e09ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -101,8 +101,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Flag indicating whether future can be remapped on a newer topology version. - * @param needVer If {@code true} need provide entry version to result closure. - * @param resC Closure applied on 'get' result. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. */ public GridNearGetFuture( GridCacheContext<K, V> cctx, @@ -118,7 +118,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap boolean skipVals, boolean canRemap, boolean needVer, - @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC + boolean keepCacheObjects ) { super(cctx, keys, @@ -132,10 +132,9 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap skipVals, canRemap, needVer, - resC); + keepCacheObjects); assert !F.isEmpty(keys); - assert !needVer || resC != null; this.tx = tx; @@ -530,7 +529,12 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } if (v != null && !reload) { - if (resC == null) { + if (needVer) { + V val0 = (V)new T2<>(skipVals ? true : v, ver); + + add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); + } + else { K key0 = key.value(cctx.cacheObjectContext(), true); V val0 = v.value(cctx.cacheObjectContext(), true); @@ -539,8 +543,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } - else - resultClosureValue(key, v, ver); } else { if (affNode == null) { @@ -661,7 +663,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap ) { boolean empty = F.isEmpty(keys); - Map<K, V> map = (resC != null || empty) ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); + Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size()); if (!empty) { boolean atomic = cctx.atomic(); @@ -699,10 +701,16 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap assert skipVals == (info.value() == null); - if (resC != null) - resultClosureValue(key, skipVals ? true : val, info.version()); + if (needVer) + versionedResult(map, key, val, info.version()); else - cctx.addResult(map, key, val, skipVals, false, deserializePortable, false); + cctx.addResult(map, + key, + val, + skipVals, + keepCacheObjects, + deserializePortable, + false); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 36eef52..6836a81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -78,6 +79,9 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter implements GridCacheMvccFuture<IgniteInternalTx> { /** */ + public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0"); + + /** */ @GridToStringExclude private KeyLockFuture keyLockFut = new KeyLockFuture(); @@ -104,7 +108,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (log.isDebugEnabled()) log.debug("Transaction future received owner changed callback: " + entry); - if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + if ((entry.context().isNear() || entry.context().isLocal()) + && owner != null && + tx.entry(entry.txKey()) != null) { keyLockFut.onKeyLocked(entry.txKey()); return true; @@ -477,7 +483,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre map(write, topVer, mappings, true, remap); for (IgniteTxEntry read : reads) - map(read, topVer, mappings, false, remap); + map(read, topVer, mappings, true, remap); keyLockFut.onAllKeysAdded(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 4dc8a84..9cd2478 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -450,10 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return; } - prepare( - Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries(), - topLocked); + prepare(tx.writeEntries(), topLocked); markInitialized(); } @@ -466,13 +462,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd } /** - * @param reads Read entries. * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @throws IgniteCheckedException If failed. */ private void prepare( - Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes, boolean topLocked ) throws IgniteCheckedException { @@ -484,7 +478,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>(); - if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + if (!F.isEmpty(writes)) { for (int cacheId : tx.activeCacheIds()) { GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); @@ -500,25 +494,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd // Assign keys to primary nodes. GridDistributedTxMapping cur = null; - for (IgniteTxEntry read : reads) { - GridDistributedTxMapping updated = map(read, topVer, cur, false, topLocked); - - if (cur != updated) { - mappings.offer(updated); - - if (updated.node().isLocal()) { - if (read.context().isNear()) - tx.nearLocallyMapped(true); - else if (read.context().isColocated()) - tx.colocatedLocallyMapped(true); - } - - cur = updated; - } - } - for (IgniteTxEntry write : writes) { - GridDistributedTxMapping updated = map(write, topVer, cur, true, topLocked); + GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) { mappings.offer(updated); @@ -650,7 +627,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * @param entry Transaction entry. * @param topVer Topology version. * @param cur Current mapping. - * @param waitLock Wait lock flag. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @return Mapping. */ @@ -658,7 +634,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd IgniteTxEntry entry, AffinityTopologyVersion topVer, @Nullable GridDistributedTxMapping cur, - boolean waitLock, boolean topLocked ) { GridCacheContext cacheCtx = entry.context(); @@ -686,7 +661,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); if (cacheCtx.isNear() || cacheCtx.isLocal()) { - if (waitLock && entry.explicitVersion() == null) + if (entry.explicitVersion() == null) lockKeys.add(entry.txKey()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 7700f05..909b547 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -181,8 +181,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean needVer, - final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) { + boolean needVer) { assert tx != null; GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, @@ -196,9 +195,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> deserializePortable, expiryPlc, skipVals, - needVer, /*can remap*/true, - c); + needVer, + true); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 5b2d50c..ad5c9d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -60,10 +60,10 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -348,30 +348,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, - boolean deserializePortable, boolean skipVals, - boolean needVer, + final boolean needVer, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, keys, readThrough, - deserializePortable, + /*deserializePortable*/false, accessPolicy(cacheCtx, keys), skipVals, - needVer, - c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { + needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { try { Map<Object, Object> map = f.get(); - if (map != null && map.size() != keys.size()) { - for (KeyCacheObject key : keys) { - if (!map.containsKey(key)) - c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); - } - } + processLoaded(map, keys, needVer, c); return null; } @@ -392,23 +385,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { topologyVersion(), CU.subjectId(this, cctx), resolveTaskName(), - deserializePortable, + /*deserializePortable*/false, accessPolicy(cacheCtx, keys), skipVals, /*can remap*/true, needVer, - c + /*keepCacheObject*/true ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() { @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) { try { Map<Object, Object> map = f.get(); - if (map != null && map.size() != keys.size()) { - for (KeyCacheObject key : keys) { - if (!map.containsKey(key)) - c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); - } - } + processLoaded(map, keys, needVer, c); return null; } @@ -422,7 +410,43 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } else { assert cacheCtx.isLocal(); - return super.loadMissing(cacheCtx, readThrough, async, keys, deserializePortable, skipVals, needVer, c); + return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, needVer, c); + } + } + + /** + * @param map Loaded values. + * @param keys Keys. + * @param needVer If {@code true} version is required for loaded values. + * @param c Closure. + */ + private void processLoaded( + Map<Object, Object> map, + final Collection<KeyCacheObject> keys, + boolean needVer, + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) { + for (KeyCacheObject key : keys) { + Object val = map.get(key); + + if (val != null) { + Object v; + GridCacheVersion ver; + + if (needVer) { + T2<Object, GridCacheVersion> t = (T2)val; + + v = t.get1(); + ver = t.get2(); + } + else { + v = val; + ver = null; + } + + c.apply(key, v, ver); + } + else + c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); } } @@ -632,7 +656,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes()); + Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads()); for (IgniteTxEntry txEntry : entries) { while (true) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 51a3316..ccf7394 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -421,7 +421,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, - boolean deserializePortable, boolean skipVals, boolean needVer, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c @@ -1621,7 +1620,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter !skipStore, false, missedMap.keySet(), - deserializePortable, skipVals, needReadVer, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @@ -1658,15 +1656,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cacheCtx.evicts().touch(e, topologyVersion()); if (visibleVal != null) { - synchronized (map) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); } } else { @@ -1681,15 +1677,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (visibleVal != null) { - synchronized (map) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); } } } @@ -2367,7 +2361,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, /*async*/true, missedForLoad, - deserializePortables(cacheCtx), skipVals, needReadVer, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 1530aeb..bdea971 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -156,7 +156,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param readThrough Read through flag. * @param async if {@code True}, then loading will happen in a separate thread. * @param keys Keys. - * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. * @param needVer If {@code true} version is required for loaded values. * @param c Closure to be applied for loaded values. @@ -167,7 +166,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { boolean readThrough, boolean async, Collection<KeyCacheObject> keys, - boolean deserializePortable, boolean skipVals, boolean needVer, GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 0c9debf..4f6317d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,6 +71,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode; import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync; @@ -3037,6 +3039,153 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testNoOptimisticExceptionChangingTopology() throws Exception { + if (FAST) + return; + + final AtomicBoolean finished = new AtomicBoolean(); + + final List<String> cacheNames = new ArrayList<>(); + + Ignite srv = ignite(1); + + try { + { + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + ccfg.setName("cache1"); + ccfg.setRebalanceMode(SYNC); + + srv.createCache(ccfg); + + cacheNames.add(ccfg.getName()); + } + + { + // Store enabled. + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false); + ccfg.setName("cache2"); + ccfg.setRebalanceMode(SYNC); + + srv.createCache(ccfg); + + cacheNames.add(ccfg.getName()); + } + + { + // Offheap. + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + ccfg.setName("cache3"); + ccfg.setRebalanceMode(SYNC); + + GridTestUtils.setMemoryMode(null, ccfg, TestMemoryMode.OFFHEAP_TIERED, 1, 64); + + srv.createCache(ccfg); + + cacheNames.add(ccfg.getName()); + } + + IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + Ignite ignite = startGrid(0); + + assertFalse(ignite.configuration().isClientMode()); + } + + return null; + } + }); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + final int KEYS_PER_THREAD = 100; + + for (int i = 1; i < SRVS + CLIENTS; i++) { + final Ignite node = ignite(i); + + final int minKey = i * KEYS_PER_THREAD; + final int maxKey = minKey + KEYS_PER_THREAD; + + // Threads update non-intersecting keys, optimistic exception should not be thrown. + + futs.add(GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + log.info("Started update thread [node=" + node.name() + + ", minKey=" + minKey + + ", maxKey=" + maxKey + ']'); + + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + List<IgniteCache<Integer, Integer>> caches = new ArrayList<>(); + + for (String cacheName : cacheNames) + caches.add(node.<Integer, Integer>cache(cacheName)); + + assertEquals(3, caches.size()); + + int iter = 0; + + while (!finished.get()) { + int keyCnt = rnd.nextInt(1, 10); + + final Set<Integer> keys = new LinkedHashSet<>(); + + while (keys.size() < keyCnt) + keys.add(rnd.nextInt(minKey, maxKey)); + + for (final IgniteCache<Integer, Integer> cache : caches) { + doInTransaction(node, OPTIMISTIC, SERIALIZABLE, new Callable<Void>() { + @Override public Void call() throws Exception { + for (Integer key : keys) + randomOperation(rnd, cache, key); + + return null; + } + }); + } + + if (iter % 100 == 0) + log.info("Iteration: " + iter); + + iter++; + } + + return null; + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + }, "update-thread-" + i)); + } + + U.sleep(60_000); + + finished.set(true); + + restartFut.get(); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); + } + finally { + for (String cacheName : cacheNames) + srv.destroyCache(cacheName); + + finished.set(true); + } + } + + /** + * @throws Exception If failed. + */ public void testConflictResolution() throws Exception { final Ignite ignite = ignite(0);
