Repository: ignite Updated Branches: refs/heads/ignite-1607-read caed8659d -> 206721a09
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/206721a0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/206721a0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/206721a0 Branch: refs/heads/ignite-1607-read Commit: 206721a0954521f3c84ccb52ce32d40ba021c9ce Parents: caed865 Author: sboikov <[email protected]> Authored: Thu Oct 8 16:30:53 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 9 13:10:01 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 199 ++-------- .../distributed/dht/GridDhtCacheAdapter.java | 6 +- .../cache/distributed/dht/GridDhtGetFuture.java | 40 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 47 ++- .../cache/distributed/near/GridNearTxLocal.java | 14 - .../CacheSerializableTransactionsTest.java | 374 ++++++++++++++++++- .../GridCacheConcurrentTxMultiNodeTest.java | 3 - .../IgniteTxMultiThreadedAbstractTest.java | 2 +- 9 files changed, 465 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index ae987b7..bb15204 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1409,144 +1409,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param taskName Task name. * @return Future. */ - public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> reloadAllAsync0( + public final IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> reloadAllAsync0( Collection<KeyCacheObject> keys, boolean ret, boolean skipVals, @Nullable UUID subjId, - String taskName) - { - final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - - if (!F.isEmpty(keys)) { - final Map<KeyCacheObject, GridCacheVersion> keyVers = new HashMap(); - - for (KeyCacheObject key : keys) { - if (key == null) - continue; - - // Skip primary or backup entries for near cache. - if (ctx.isNear() && ctx.affinity().localNode(key, topVer)) - continue; - - while (true) { - try { - GridCacheEntryEx entry = entryExSafe(key, topVer); - - if (entry == null) - break; - - GridCacheVersion ver = entry.version(); - - keyVers.put(key, ver); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry for reload (will retry): " + key); - } - catch (GridDhtInvalidPartitionException ignore) { - if (log.isDebugEnabled()) - log.debug("Got invalid partition for key (will skip): " + key); + String taskName) { + assert false; - break; - } - } - } - - final Map<KeyCacheObject, CacheObject> map = - ret ? U.<KeyCacheObject, CacheObject>newHashMap(keys.size()) : null; - - final Collection<KeyCacheObject> absentKeys = F.view(keyVers.keySet()); - - final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>(); - - IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null, - subjId, taskName, new CI2<KeyCacheObject, Object>() { - /** Version for all loaded entries. */ - private GridCacheVersion nextVer = ctx.versions().next(); - - /** {@inheritDoc} */ - @Override public void apply(KeyCacheObject key, Object val) { - loadedKeys.add(key); - - GridCacheEntryEx entry = peekEx(key); - - if (entry != null) { - try { - GridCacheVersion curVer = keyVers.get(key); - - if (curVer != null) { - boolean wasNew = entry.isNewLocked(); - - entry.unswap(); - - CacheObject cacheVal = ctx.toCacheObject(val); - - boolean set = entry.versionedValue(cacheVal, curVer, nextVer); - - ctx.evicts().touch(entry, topVer); - - if (map != null) { - if (set || wasNew) - map.put(key, cacheVal); - else { - CacheObject v = entry.peek(true, false, false, null); - - if (v != null) - map.put(key, v); - } - } - - if (log.isDebugEnabled()) { - log.debug("Set value loaded from store into entry [set=" + set + ", " + - "curVer=" + - curVer + ", newVer=" + nextVer + ", entry=" + entry + ']'); - } - } - else { - if (log.isDebugEnabled()) { - log.debug("Current version was not found (either entry was removed or " + - "validation was not passed: " + entry); - } - } - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) { - log.debug("Got removed entry for reload (will not store reloaded entry) " + - "[entry=" + entry + ']'); - } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } - }); - - return readFut.chain(new CX1<IgniteInternalFuture<Object>, Map<KeyCacheObject, CacheObject>>() { - @Override public Map<KeyCacheObject, CacheObject> applyx(IgniteInternalFuture<Object> e) - throws IgniteCheckedException { - // Touch all not loaded keys. - for (KeyCacheObject key : absentKeys) { - if (!loadedKeys.contains(key)) { - GridCacheEntryEx entry = peekEx(key); - - if (entry != null) - ctx.evicts().touch(entry, topVer); - } - } - - // Make sure there were no exceptions. - e.get(); - - return map; - } - }); - } - - return new GridFinishedFuture<>(Collections.<KeyCacheObject, CacheObject>emptyMap()); + return new GridFinishedFuture<>(); } /** @@ -1763,7 +1634,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V expiry, skipVals, false, - canRemap); + canRemap, + false); } /** @@ -1778,7 +1650,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param keepCacheObjects Keep cache objects * @return Future. */ - public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys, + public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys, final boolean readThrough, boolean checkTx, @Nullable final UUID subjId, @@ -1787,7 +1659,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals, final boolean keepCacheObjects, - boolean canRemap + boolean canRemap, + final boolean needVer ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); @@ -1822,20 +1695,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridCacheEntryEx entry = entryEx(key); try { - CacheObject val = entry.innerGet(null, + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null, ctx.isSwapOrOffheapEnabled(), - /*don't read-through*/false, - /*fail-fast*/true, /*unmarshal*/true, /*update-metrics*/!skipVals, /*event*/!skipVals, - /*temporary*/false, subjId, null, taskName, expiry); - if (val == null) { + if (res == null) { GridCacheVersion ver = entry.version(); if (misses == null) @@ -1844,7 +1714,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V misses.put(key, ver); } else { - ctx.addResult(map, key, val, skipVals, keepCacheObjects, deserializePortable, true); + if (needVer) { + assert keepCacheObjects; + + map.put((K1)key, (V1)new T2<>(res.get1(), res.get2())); + } + else { + ctx.addResult(map, + key, + res.get1(), + skipVals, + keepCacheObjects, + deserializePortable, + true); + } if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -1860,15 +1743,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (log.isDebugEnabled()) log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key); } - catch (GridCacheFilterFailedException ignore) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + entry); - - if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) - ctx.evicts().touch(entry, topVer); - - break; // While loop. - } } } @@ -1918,13 +1792,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Don't put key-value pair into result map if value is null. if (val != null) { - ctx.addResult(map, - key, - cacheVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); + if (needVer) { + assert keepCacheObjects; + + map.put((K1)key, (V1)new T2<>(cacheVal, set ? nextVer : ver)); + } + else { + ctx.addResult(map, + key, + cacheVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } } if (tx0 == null || (!tx0.implicit() && @@ -2017,6 +1898,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } else { + assert !needVer; + return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) { return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false, !readThrough); http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 3ce9ee9..25e480c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CI3; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -605,7 +606,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param skipVals Skip values flag. * @return Get future. */ - IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync( + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync( Collection<KeyCacheObject> keys, boolean readThrough, @Nullable UUID subjId, @@ -623,7 +624,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap expiry, skipVals, /*keep cache objects*/true, - canRemap); + canRemap, + /*need version*/true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index a67b1de..e8cafb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -17,11 +17,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -45,6 +44,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; @@ -147,6 +147,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col assert reader != null; assert !F.isEmpty(keys); + assert !reload; + this.reader = reader; this.cctx = cctx; this.msgId = msgId; @@ -291,8 +293,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return new GridFinishedFuture<Collection<GridCacheEntryInfo>>( Collections.<GridCacheEntryInfo>emptyList()); - final Collection<GridCacheEntryInfo> infos = new LinkedList<>(); - String taskName0 = cctx.kernalContext().job().currentTaskName(); if (taskName0 == null) @@ -335,8 +335,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col txFut.add(f); } - infos.add(info); - break; } catch (IgniteCheckedException err) { @@ -355,7 +353,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col if (txFut != null) txFut.markInitialized(); - IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut; + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; if (txFut == null || txFut.isDone()) { if (reload && cctx.readThrough() && cctx.store().configured()) { @@ -393,8 +391,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() { - @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() { + @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); @@ -432,23 +430,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } return new GridEmbeddedFuture<>( - new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) { + new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) { if (e != null) { onDone(e); return Collections.emptyList(); } else { - for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) { - GridCacheEntryInfo info = it.next(); + Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size()); - Object v = map.get(info.key()); + for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) { + T2<CacheObject, GridCacheVersion> val = entry.getValue(); - if (v == null) - it.remove(); - else - info.value(skipVals ? null : (CacheObject)v); + assert val != null; + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.cacheId(cctx.cacheId()); + info.key(entry.getKey()); + info.value(skipVals ? null : val.get1()); + info.version(val.get2()); + + infos.add(info); } return infos; http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 2071275..44f34aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (finish(false) || state() == UNKNOWN) fut.finish(); else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + + fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(GridDhtTxLocal.this))); } http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index e48601d..04c4851 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -139,7 +140,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - f.onResult(e); + f.onNodeLeft(e); found = true; } @@ -165,7 +166,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null) - tx.onOptimisticException(nodeId); + tx.removeMapping(nodeId); err.compareAndSet(null, e); } @@ -519,14 +520,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre Collection<MiniFuture> futs = (Collection)futures(); - for (MiniFuture fut : futs) { - if (remap && fut.rcvRes.get()) + Iterator<MiniFuture> it = futs.iterator(); + + while (it.hasNext()) { + MiniFuture fut = it.next(); + + if (skipFuture(remap, fut)) continue; IgniteCheckedException err = prepare(fut); if (err != null) { - onDone(err); + while (it.hasNext()) { + fut = it.next(); + + if (skipFuture(remap, fut)) + continue; + + tx.removeMapping(fut.mapping().node().id()); + + fut.onResult(err); + } break; } @@ -536,10 +550,19 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } /** + * @param remap Remap flag. + * @param fut Future. + * @return {@code True} if skip future during remap. + */ + private boolean skipFuture(boolean remap, MiniFuture fut) { + return remap && fut.rcvRes.get(); + } + + /** * @param fut Mini future. - * @return {@code False} if should stop mapping. + * @return Prepare error if any. */ - private IgniteCheckedException prepare(final MiniFuture fut) { + @Nullable private IgniteCheckedException prepare(final MiniFuture fut) { GridDistributedTxMapping m = fut.mapping(); final ClusterNode n = m.node(); @@ -575,7 +598,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre tx.userPrepare(); } catch (IgniteCheckedException e) { - onError(m.node().id(), e); + fut.onResult(e); return e; } @@ -605,7 +628,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - fut.onResult(e); + fut.onNodeLeft(e); return e; } @@ -806,7 +829,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre */ void onResult(Throwable e) { if (rcvRes.compareAndSet(false, true)) { - err.compareAndSet(null, e); + onError(m.node().id(), e); if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -822,7 +845,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** * @param e Node failure. */ - void onResult(ClusterTopologyCheckedException e) { + void onNodeLeft(ClusterTopologyCheckedException e) { if (isDone()) return; @@ -856,7 +879,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre assert cctx.kernalContext().clientNode(); assert m.clientFirst(); - tx.onClientRemap(m.node().id()); + tx.removeMapping(m.node().id()); ClientRemapFuture remapFut = new ClientRemapFuture(); http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 9207bd0..5b2d50c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -597,20 +597,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** - * @param nodeId Primary node id. - */ - void onOptimisticException(UUID nodeId) { - mappings.remove(nodeId); - } - - /** - * @param nodeId Primary node id. - */ - void onClientRemap(UUID nodeId) { - mappings.remove(nodeId); - } - - /** * @param nodeId Node ID to mark with explicit lock. * @return {@code True} if mapping was found. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 9f74b9c..85c4f80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -77,7 +78,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final boolean FAST = true; + private static final boolean FAST = false; /** */ private static Map<Integer, Integer> storeMap = new ConcurrentHashMap8<>(); @@ -95,6 +96,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + cfg.setPeerClassLoadingEnabled(false); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); cfg.setClientMode(client); @@ -266,8 +269,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { txAsync(cache, OPTIMISTIC, SERIALIZABLE, new IgniteClosure<IgniteCache<Integer, Integer>, Void>() { - @Override - public Void apply(IgniteCache<Integer, Integer> cache) { + @Override public Void apply(IgniteCache<Integer, Integer> cache) { cache.get(key); return null; @@ -1305,6 +1307,329 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testRollbackNearCache1() throws Exception { + rollbackNearCacheWrite(true); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache2() throws Exception { + rollbackNearCacheWrite(false); + } + + /** + * @param near If {@code true} locks entry using the same near cache. + * @throws Exception If failed. + */ + private void rollbackNearCacheWrite(boolean near) throws Exception { + Ignite ignite0 = ignite(0); + + IgniteCache<Integer, Integer> cache0 = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final String cacheName = cache0.getName(); + + try { + Ignite ignite = ignite(SRVS); + + IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName, + new NearCacheConfiguration<Integer, Integer>()); + + IgniteTransactions txs = ignite.transactions(); + + Integer key1 = primaryKey(ignite(0).cache(cacheName)); + Integer key2 = primaryKey(ignite(1).cache(cacheName)); + Integer key3 = primaryKey(ignite(2).cache(cacheName)); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = null; + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + fut = lockKey(latch, near ? cache : cache0, key2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + assert fut != null; + + fut.get(); + + checkValue(key1, null, cacheName); + checkValue(key2, 1, cacheName); + checkValue(key3, null, cacheName); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + tx.commit(); + } + + checkValue(key1, key1, cacheName); + checkValue(key2, key2, cacheName); + checkValue(key3, key3, cacheName); + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache3() throws Exception { + rollbackNearCacheRead(true); + } + + /** + * @throws Exception If failed. + */ + public void testRollbackNearCache4() throws Exception { + rollbackNearCacheRead(false); + } + + /** + * @param near If {@code true} updates entry using the same near cache. + * @throws Exception If failed. + */ + private void rollbackNearCacheRead(boolean near) throws Exception { + Ignite ignite0 = ignite(0); + + IgniteCache<Integer, Integer> cache0 = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final String cacheName = cache0.getName(); + + try { + Ignite ignite = ignite(SRVS); + + IgniteCache<Integer, Integer> cache = ignite.createNearCache(cacheName, + new NearCacheConfiguration<Integer, Integer>()); + + IgniteTransactions txs = ignite.transactions(); + + Integer key1 = primaryKey(ignite(0).cache(cacheName)); + Integer key2 = primaryKey(ignite(1).cache(cacheName)); + Integer key3 = primaryKey(ignite(2).cache(cacheName)); + + cache0.put(key1, -1); + cache0.put(key2, -1); + cache0.put(key3, -1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.get(key1); + cache.get(key2); + cache.get(key3); + + updateKey(near ? cache : cache0, key2, -2); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + checkValue(key1, -1, cacheName); + checkValue(key2, -2, cacheName); + checkValue(key3, -1, cacheName); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, key1); + cache.put(key2, key2); + cache.put(key3, key3); + + tx.commit(); + } + + checkValue(key1, key1, cacheName); + checkValue(key2, key2, cacheName); + checkValue(key3, key3, cacheName); + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ + public void testAccountTx1() throws Exception { + accountTx(false, false); + } + + /** + * @throws Exception If failed. + */ + public void _testAccountTxNearCache() throws Exception { + accountTx(false, true); + } + + /** + * @throws Exception If failed. + */ + public void testAccountTx2() throws Exception { + accountTx(true, false); + } + + /** + * @param getAll If {@code true} uses getAll/putAll in transaction. + * @param nearCache If {@code true} near cache is enabled. + * @throws Exception If failed. + */ + private void accountTx(final boolean getAll, final boolean nearCache) throws Exception { + final Ignite ignite0 = ignite(0); + + final String cacheName = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + try { + final List<Ignite> clients = clients(); + + final int ACCOUNTS = 100; + final int VAL_PER_ACCOUNT = 10_000; + + IgniteCache<Integer, Account> cache = ignite0.cache(cacheName); + + for (int i = 0; i < ACCOUNTS; i++) + cache.put(i, new Account(VAL_PER_ACCOUNT)); + + final AtomicInteger idx = new AtomicInteger(); + + final int THREADS = 20; + + final long stopTime = System.currentTimeMillis() + 10_000; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement() % clients.size(); + + Ignite node = clients.get(nodeIdx); + + log.info("Tx thread: " + node.name()); + + final IgniteTransactions txs = node.transactions(); + + final IgniteCache<Integer, Account> cache = + nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) : + node.<Integer, Account>cache(cacheName); + + assertNotNull(cache); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) { + int id1 = rnd.nextInt(ACCOUNTS); + + int id2 = rnd.nextInt(ACCOUNTS); + + while (id2 == id1) + id2 = rnd.nextInt(ACCOUNTS); + + try { + while (true) { + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + if (getAll) { + Map<Integer, Account> map = cache.getAll(F.asSet(id1, id2)); + + Account a1 = cache.get(id1); + Account a2 = cache.get(id2); + + assertNotNull(a1); + assertNotNull(a2); + + if (a1.value() > 0) { + a1 = new Account(a1.value() - 1); + a2 = new Account(a2.value() + 1); + } + + map.put(id1, a1); + map.put(id2, a2); + + cache.putAll(map); + } + else { + Account a1 = cache.get(id1); + Account a2 = cache.get(id2); + + assertNotNull(a1); + assertNotNull(a2); + + if (a1.value() > 0) { + a1 = new Account(a1.value() - 1); + a2 = new Account(a2.value() + 1); + } + + cache.put(id1, a1); + cache.put(id2, a2); + } + + tx.commit(); + } + + break; + } + catch (TransactionOptimisticException ignore) { + // Retry. + } + } + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + + return null; + } + }, THREADS, "tx-thread"); + + fut.get(30_000); + + int sum = 0; + + for (int i = 0; i < ACCOUNTS; i++) { + Account a = cache.get(i); + + assertNotNull(a); + assertTrue(a.value() >= 0); + + log.info("Account: " + a.value()); + + sum += a.value(); + } + + assertEquals(ACCOUNTS * VAL_PER_ACCOUNT, sum); + } + finally { + ignite0.destroyCache(cacheName); + } + } + + /** + * @throws Exception If failed. + */ public void testConcurrentUpdateNoDeadlock() throws Exception { concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); } @@ -1319,14 +1644,14 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testConcurrentUpdateNoDeadlockClients() throws Exception { + public void testConcurrentUpdateNoDeadlockFromClients() throws Exception { concurrentUpdateNoDeadlock(clients(), 20, false); } /** * @throws Exception If failed. */ - public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { + public void testConcurrentUpdateNoDeadlockFromClientsNodeRestart() throws Exception { concurrentUpdateNoDeadlock(clients(), 20, true); } @@ -1356,12 +1681,15 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, int threads, final boolean restart) throws Exception { + if (FAST) + return; + assert updateNodes.size() > 0; - final Ignite ignite0 = ignite(0); + final Ignite srv = ignite(1); final String cacheName = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + srv.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); try { final int KEYS = 100; @@ -1389,12 +1717,10 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { }); } - int ITERS = FAST ? 1 : 10; - - for (int i = 0; i < ITERS; i++) { + for (int i = 0; i < 10; i++) { log.info("Iteration: " + i); - final long stopTime = U.currentTimeMillis() + (FAST ? 1000 : 10_000); + final long stopTime = U.currentTimeMillis() + 10_000; final AtomicInteger idx = new AtomicInteger(); @@ -1454,7 +1780,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { updateFut.get(60, SECONDS); - IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); + IgniteCache<Integer, Integer> cache = srv.cache(cacheName); for (int key = 0; key < KEYS; key++) { Integer val = cache.get(key); @@ -1474,7 +1800,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } } finally { - destroyCache(ignite(1), cacheName); + destroyCache(srv, cacheName); } } @@ -1735,4 +2061,26 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { return val; } } + + /** + * + */ + static class Account { + /** */ + private final int val; + + /** + * @param val Value. + */ + public Account(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java index 67bc08c..1ef77f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java @@ -162,9 +162,6 @@ public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest { c.setPeerClassLoadingEnabled(false); - // Enable tracing. -// Logger.getLogger("org.apache.ignite.kernal.processors.cache.GridCacheDgcManager.trace").setLevel(Level.DEBUG); - return c; } http://git-wip-us.apache.org/repos/asf/ignite/blob/206721a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java index 7a1a0b9..191feb6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java @@ -225,7 +225,7 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract final int ITERATIONS = 100; - for (int key0 = 0; key0 < 20; key0++) { + for (int key0 = 100_000; key0 < 100_000 + 20; key0++) { final int key = key0; cache.put(key, 0L);
