http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 221b230..eb7c78f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -155,7 +155,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -183,7 +184,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte }); } - AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); subjId = ctx.subjectIdPerCall(subjId, opCtx); @@ -197,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals); + skipVals, + canRemap); } /** {@inheritDoc} */ @@ -226,7 +230,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param skipVals Skip values flag. * @return Loaded values. */ - public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys, + public IgniteInternalFuture<Map<K, V>> loadAsync( + @Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean reload, boolean forcePrimary, @@ -235,7 +240,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -340,7 +346,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, expiryPlc, - skipVals); + skipVals, + canRemap); fut.init();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index c784948..90ca8df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @return Topology exception with user-friendly message. */ private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) { - return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " + - "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + + return topEx; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index cbf6b40..1a90de9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -54,6 +54,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { /** */ + private static final int DUMP_PENDING_OBJECTS_THRESHOLD = + IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); + + /** */ private static final long serialVersionUID = 0L; /** Dummy flag. */ @@ -711,6 +715,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Before waiting for partition release future: " + this); + int dumpedObjects = 0; + while (true) { try { partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); @@ -719,7 +725,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } catch (IgniteFutureTimeoutCheckedException ignored) { // Print pending transactions and locks that might have led to hang. - dumpPendingObjects(); + if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) { + dumpPendingObjects(); + + dumpedObjects++; + } } } @@ -731,6 +741,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); + dumpedObjects = 0; + while (true) { try { locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); @@ -738,22 +750,26 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT break; } catch (IgniteFutureTimeoutCheckedException ignored) { - U.warn(log, "Failed to wait for locks release future. " + - "Dumping pending objects that might be the cause: " + cctx.localNodeId()); + if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) { + U.warn(log, "Failed to wait for locks release future. " + + "Dumping pending objects that might be the cause: " + cctx.localNodeId()); - U.warn(log, "Locked keys:"); + U.warn(log, "Locked keys:"); - for (IgniteTxKey key : cctx.mvcc().lockedKeys()) - U.warn(log, "Locked key: " + key); + for (IgniteTxKey key : cctx.mvcc().lockedKeys()) + U.warn(log, "Locked key: " + key); - for (IgniteTxKey key : cctx.mvcc().nearLockedKeys()) - U.warn(log, "Locked near key: " + key); + for (IgniteTxKey key : cctx.mvcc().nearLockedKeys()) + U.warn(log, "Locked near key: " + key); - Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks = - cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); + Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks = + cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); - for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) - U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); + for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) + U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); + + dumpedObjects++; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 041f83a..2bf5365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -364,7 +364,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -387,7 +388,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + canRemap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 79b7c1a..845be38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -203,13 +203,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda return (IgniteInternalFuture)loadAsync(tx, keys, reload, - false, + /*force primary*/false, subjId, taskName, - true, - null, + /*deserialize portable*/true, + /*expiry policy*/null, skipVals, - /*skip store*/false); + /*skip store*/false, + /*can remap*/true); } /** @@ -234,7 +235,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVal, - boolean skipStore + boolean skipStore, + boolean canRemap ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -253,7 +255,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda taskName, deserializePortable, expiry, - skipVal); + skipVal, + canRemap); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 9e8d76b..6f4f15e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { true, null, false, - /*skip store*/false).get().get(keyValue(false)); + /*skip store*/false, + /*can remap*/true + ).get().get(keyValue(false)); } /** @@ -433,6 +435,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { @Override public GridCacheMvccCandidate addLocal( long threadId, GridCacheVersion ver, + AffinityTopologyVersion topVer, long timeout, boolean reenter, boolean tx, @@ -441,6 +444,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { null, threadId, ver, + topVer, timeout, reenter, tx, @@ -454,6 +458,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @param dhtNodeId DHT node ID. * @param threadId Owning thread ID. * @param ver Lock version. + * @param topVer Topology version. * @param timeout Timeout to acquire lock. * @param reenter Reentry flag. * @param tx Transaction flag. @@ -465,6 +470,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { @Nullable UUID dhtNodeId, long threadId, GridCacheVersion ver, + AffinityTopologyVersion topVer, long timeout, boolean reenter, boolean tx, @@ -513,6 +519,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { tx, implicitSingle); + cand.topologyVersion(topVer); + owner = mvcc.anyOwner(); boolean emptyAfter = mvcc.isEmpty(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 2017654..951fddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -62,7 +62,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); /** Context. */ - private GridCacheContext<K, V> cctx; + private final GridCacheContext<K, V> cctx; /** Keys. */ private Collection<KeyCacheObject> keys; @@ -106,6 +106,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma /** Expiry policy. */ private IgniteCacheExpiryPolicy expiryPlc; + /** Flag indicating that get should be done on a locked topology version. */ + private final boolean canRemap; + /** * @param cctx Context. * @param keys Keys. @@ -131,7 +134,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -148,6 +152,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma this.deserializePortable = deserializePortable; this.expiryPlc = expiryPlc; this.skipVals = skipVals; + this.canRemap = canRemap; futId = IgniteUuid.randomUuid(); @@ -161,7 +166,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * Initializes future. */ public void init() { - AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); @@ -327,7 +334,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma remapKeys.add(key); } - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx(); assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -435,7 +442,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma taskName, expiryPlc); - ClusterNode primary = null; + ClusterNode affNode = null; if (v == null && allowLocRead && cctx.affinityNode()) { GridDhtCacheAdapter<K, V> dht = cache().dht(); @@ -472,16 +479,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma near.metrics0().onRead(true); } else { - primary = cctx.affinity().primary(key, topVer); + affNode = affinityNode(key, topVer); - if (primary == null) { + if (affNode == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); return savedVers; } - if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) + if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) near.metrics0().onRead(false); } } @@ -507,10 +514,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - if (primary == null) { - primary = cctx.affinity().primary(key, topVer); + if (affNode == null) { + affNode = affinityNode(key, topVer); - if (primary == null) { + if (affNode == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); @@ -527,13 +534,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion()); - LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary); + LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + - "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']')); + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); return savedVers; } @@ -545,10 +552,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key))) addRdr = true; - LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary); + LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode); if (old == null) - mappings.put(primary, old = new LinkedHashMap<>(3, 1f)); + mappings.put(affNode, old = new LinkedHashMap<>(3, 1f)); old.put(key, addRdr); } @@ -579,6 +586,28 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } /** + * Affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node to get key from. + */ + private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : affNodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @return Near cache. */ private GridNearCacheAdapter<K, V> cache() { @@ -752,37 +781,46 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - final AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + // Try getting value from alive nodes. + if (!canRemap) { + // Remap + map(keys.keySet(), F.t(node, keys), topVer); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); - - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); - - try { - fut.get(); - - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); - - onDone(Collections.<K, V>emptyMap()); - } - catch (IgniteCheckedException e) { - GridNearGetFuture.this.onDone(e); + onDone(Collections.<K, V>emptyMap()); + } + else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); + + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); + + try { + fut.get(); + + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridNearGetFuture.this.onDone(e); + } } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 3d28018..2815194 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -307,6 +307,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean dhtNodeId, threadId, lockVer, + topVer, timeout, !inTx(), inTx(), @@ -319,9 +320,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean txEntry.cached(entry); } - if (c != null) - c.topologyVersion(topVer); - synchronized (mux) { entries.add(entry); } @@ -1234,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @return Topology exception with user-friendly message. */ private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) { - return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " + - "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " + + "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested); + + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get())); + + return topEx; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index e71dd62..8e66cb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -101,7 +101,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd MiniFuture f = (MiniFuture) fut; if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onResult(e); found = true; } @@ -563,8 +568,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd try { cctx.io().send(n, req, tx.ioPolicy()); } + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + fut.onResult(e); + } catch (IgniteCheckedException e) { - // Fail the whole thing. fut.onResult(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index f51b4b8..9ce5bd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA MiniFuture f = (MiniFuture)fut; if (f.node().id().equals(nodeId)) { - f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onNodeLeft(e); found = true; } @@ -224,6 +229,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA cctx.io().send(node, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer)); + fut.onNodeLeft(e); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 696acfb..a1f1383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -101,7 +101,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -142,7 +143,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - skipStore); + skipStore, + canRemap); } /** @@ -172,7 +174,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> tx.resolveTaskName(), deserializePortable, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index cb391e4..5ff7345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { }); } else if (cacheCtx.isColocated()) { - return cacheCtx.colocated().loadAsync(keys, + return cacheCtx.colocated().loadAsync( + keys, readThrough, /*reload*/false, /*force primary*/false, @@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { resolveTaskName(), deserializePortable, accessPolicy(cacheCtx, keys), - skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { + skipVals, + /*can remap*/true + ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { try { Map<Object, Object> map = f.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index b418500..b24c62d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -101,7 +101,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse * @param miniId Mini future ID. * @param dhtVer DHT version. * @param writeVer Write version. - * @param invalidParts Invalid partitions. * @param retVal Return value. * @param err Error. * @param clientRemapVer Not {@code null} if client node should remap transaction. @@ -112,7 +111,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse IgniteUuid miniId, GridCacheVersion dhtVer, GridCacheVersion writeVer, - Collection<Integer> invalidParts, GridCacheReturn retVal, Throwable err, AffinityTopologyVersion clientRemapVer @@ -127,7 +125,6 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse this.miniId = miniId; this.dhtVer = dhtVer; this.writeVer = writeVer; - this.invalidParts = invalidParts; this.retVal = retVal; this.clientRemapVer = clientRemapVer; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index ea59f1f..6c04761 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -261,7 +261,9 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { // Allow next lock in the thread to proceed. if (!cand.used()) { - GridLocalCacheEntry e = (GridLocalCacheEntry)cctx.cache().peekEx(cand.key()); + GridCacheContext cctx0 = cand.parent().context(); + + GridLocalCacheEntry e = (GridLocalCacheEntry)cctx0.cache().peekEx(cand.key()); // At this point candidate may have been removed and entry destroyed, // so we check for null. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 8dd3276..3dc5946 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -458,7 +458,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { A.notNull(keys, "keys"); @@ -570,8 +571,18 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (success || !storeEnabled) return vals; - return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable, - false, expiry, skipVals).get(); + return getAllAsync( + keys, + opCtx == null || !opCtx.skipStore(), + null, + false, + subjId, + taskName, + deserializePortable, + /*force primary*/false, + expiry, + skipVals, + /*can remap*/true).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 4f21ad7..4e43d97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -302,7 +302,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * @return Invalid partitions. */ - public Set<Integer> invalidPartitions(); + public Map<Integer, Set<Integer>> invalidPartitions(); /** * Gets owned version for near remote transaction. http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index e9fdd22..b8beb15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -162,7 +162,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter private AtomicBoolean preparing = new AtomicBoolean(); /** */ - private Set<Integer> invalidParts = new GridLeanSet<>(); + private Map<Integer, Set<Integer>> invalidParts = new HashMap<>(3); /** * Transaction state. Note that state is not protected, as we want to @@ -671,16 +671,25 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public Set<Integer> invalidPartitions() { + @Override public Map<Integer, Set<Integer>> invalidPartitions() { return invalidParts; } /** {@inheritDoc} */ @Override public void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int part) { - invalidParts.add(part); + Set<Integer> parts = invalidParts.get(cacheCtx.cacheId()); + + if (parts == null) { + parts = new GridLeanSet<>(); + + invalidParts.put(cacheCtx.cacheId(), parts); + } + + parts.add(part); if (log.isDebugEnabled()) - log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']'); + log.debug("Added invalid partition for transaction [cache=" + cacheCtx.name() + ", part=" + part + + ", tx=" + this + ']'); } /** {@inheritDoc} */ @@ -1779,7 +1788,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public Set<Integer> invalidPartitions() { + @Override public Map<Integer, Set<Integer>> invalidPartitions() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 3c792f6..e9070a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @GridDirectTransient private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol; + /** Transient field for calculated entry processor value. */ + @GridDirectTransient + private CacheObject entryProcessorCalcVal; + /** Transform closure bytes. */ @GridToStringExclude private byte[] transformClosBytes; @@ -787,6 +791,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return expiryPlc; } + /** + * @return Entry processor calculated value. + */ + public CacheObject entryProcessorCalculatedValue() { + return entryProcessorCalcVal; + } + + /** + * @param entryProcessorCalcVal Entry processor calculated value. + */ + public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) { + this.entryProcessorCalcVal = entryProcessorCalcVal; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index e481e25..227cb34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -290,7 +290,6 @@ public class IgniteTxHandler { req.version(), null, null, - null, top.topologyVersion()); try { @@ -803,7 +802,7 @@ public class IgniteTxHandler { res.nearEvicted(nearTx.evicted()); if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions())) - res.invalidPartitions(dhtTx.invalidPartitions()); + res.invalidPartitionsByCacheId(dhtTx.invalidPartitions()); if (req.onePhaseCommit()) { assert req.last(); @@ -1154,7 +1153,7 @@ public class IgniteTxHandler { if (req.last()) tx.state(PREPARED); - res.invalidPartitions(tx.invalidPartitions()); + res.invalidPartitionsByCacheId(tx.invalidPartitions()); if (tx.empty() && req.last()) { tx.rollback(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b354fed..a32e7b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2216,8 +2216,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedForLoad.add(cacheKey); } else { - assert !transform; - assert txEntry.op() != TRANSFORM; + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; if (retval) ret.set(cacheCtx, null, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 78b09e6..0f43b8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -651,6 +651,11 @@ public class GridServiceProcessor extends GridProcessorAdapter { private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException { ServiceConfiguration cfg = dep.configuration(); + Object nodeFilter = cfg.getNodeFilter(); + + if (nodeFilter != null) + ctx.resource().injectGeneric(nodeFilter); + int totalCnt = cfg.getTotalCount(); int maxPerNodeCnt = cfg.getMaxPerNodeCount(); String cacheName = cfg.getCacheName(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f8c4c7e..c9be652 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; @@ -580,7 +581,14 @@ public abstract class IgniteUtils { m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() { @Override public IgniteException apply(IgniteCheckedException e) { - return new ClusterTopologyException(e.getMessage(), e); + ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e); + + ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e; + + if (checked.retryReadyFuture() != null) + topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture())); + + return topEx; } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index f3bcab0..5c43ad9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -4085,6 +4085,20 @@ public class GridFunc { * @param val Value to find. * @return {@code True} if array contains given value. */ + public static boolean contains(Integer[] arr, int val) { + for (Integer el : arr) { + if (el == val) + return true; + } + + return false; + } + + /** + * @param arr Array. + * @param val Value to find. + * @return {@code True} if array contains given value. + */ @SuppressWarnings("ForLoopReplaceableByForEach") public static boolean contains(long[] arr, long val) { for (int i = 0; i < arr.length; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index b93c547..2a07879 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -99,6 +99,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** Local address */ private String locAddr; + /** Time to live. */ + private Integer ttl; + /** */ @GridToStringExclude private Collection<AddressSender> addrSnds; @@ -223,6 +226,32 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { return locAddr; } + /** + * Set the default time-to-live for multicast packets sent out on this + * IP finder in order to control the scope of the multicast. + * <p> + * The TTL has to be in the range {@code 0 <= TTL <= 255}. + * <p> + * If TTL is {@code 0}, packets are not transmitted on the network, + * but may be delivered locally. + * + * @param ttl Time to live. + */ + @IgniteSpiConfiguration(optional = true) + public void setTimeToLive(int ttl) { + this.ttl = ttl; + } + + /** + * Set the default time-to-live for multicast packets sent out on this + * IP finder. + * + * @return Time to live. + */ + public int getTimeToLive() { + return ttl; + } + /** {@inheritDoc} */ @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { // If IGNITE_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from @@ -245,6 +274,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { throw new IgniteSpiException("Invalid number of address request attempts, " + "value greater than zero is expected: " + addrReqAttempts); + if (ttl != null && (ttl < 0 || ttl > 255)) + throw new IgniteSpiException("Time-to-live value is out of 0 <= TTL <= 255 range: " + ttl); + if (F.isEmpty(getRegisteredAddresses())) U.warn(log, "TcpDiscoveryMulticastIpFinder has no pre-configured addresses " + "(it is recommended in production to specify at least one address in " + @@ -453,6 +485,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { sock.setSoTimeout(resWaitTime); + if (ttl != null) + sock.setTimeToLive(ttl); + reqPckt.setData(MSG_ADDR_REQ_DATA); try { @@ -722,6 +757,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { sock.joinGroup(mcastGrp); + if (ttl != null) + sock.setTimeToLive(ttl); + return sock; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml index 91d77cd..3e3d6e0 100644 --- a/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml +++ b/modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml @@ -180,6 +180,14 @@ <property name="javaName" value="name"/> <property name="javaType" value="java.lang.String"/> </bean> + <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata"> + <property name="databaseName" value="salary"/> + <property name="databaseType"> + <util:constant static-field="java.sql.Types.INTEGER"/> + </property> + <property name="javaName" value="salary"/> + <property name="javaType" value="java.lang.Integer"/> + </bean> </list> </property> </bean> http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index 182d3bc..68a77dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -183,12 +183,24 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache // No-op. } - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person_Complex (id integer not null, org_id integer not null, city_id integer not null, name varchar(50), PRIMARY KEY(id))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " + + "name varchar(50), salary integer, PRIMARY KEY(id))"); conn.commit(); @@ -238,7 +250,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache U.closeQuiet(prnStmt); - PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name) VALUES (?, ?, ?, ?)"); + PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); for (int i = 0; i < PERSON_CNT; i++) { prnComplexStmt.setInt(1, i); @@ -246,6 +258,11 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache prnComplexStmt.setInt(3, i % 100); prnComplexStmt.setString(4, "name" + i); + if (i > 0) + prnComplexStmt.setInt(5, 1000 + i * 500); + else // Add person with null salary + prnComplexStmt.setNull(5, java.sql.Types.INTEGER); + prnComplexStmt.addBatch(); } @@ -274,7 +291,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache assert key.getId() == val.getId(); assert key.getOrgId() == val.getOrgId(); - assert ("name" + key.getId()).equals(val.getName()); + assertEquals("name" + key.getId(), val.getName()); prnComplexKeys.add((PersonComplexKey)k); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java index eac7669..9483545 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java @@ -190,7 +190,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach if (rnd.nextBoolean()) cache.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id)); else - cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id)); + cache.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1)); } return null; @@ -209,7 +209,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach if (rnd.nextBoolean()) cache.putIfAbsent(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id)); else - cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id)); + cache.putIfAbsent(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, i)); } return null; @@ -248,7 +248,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach if (rnd.nextBoolean()) map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id)); else - map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id)); + map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id, 1)); } IgniteCache<Object, Object> cache = jcache(); @@ -273,17 +273,17 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach IgniteCache<PersonKey, Person> cache = jcache(); try (Transaction tx = grid().transactions().txStart()) { - cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1)); - cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2)); - cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3)); + cache.put(new PersonKey(1), new Person(1, rnd.nextInt(), "Name" + 1, 1)); + cache.put(new PersonKey(2), new Person(2, rnd.nextInt(), "Name" + 2, 2)); + cache.put(new PersonKey(3), new Person(3, rnd.nextInt(), "Name" + 3, 3)); cache.get(new PersonKey(1)); cache.get(new PersonKey(4)); Map<PersonKey, Person> map = U.newHashMap(2); - map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5)); - map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6)); + map.put(new PersonKey(5), new Person(5, rnd.nextInt(), "Name" + 5, 5)); + map.put(new PersonKey(6), new Person(6, rnd.nextInt(), "Name" + 6, 6)); cache.putAll(map); http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java index 1c4b9a7..95c83b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/Person.java @@ -37,6 +37,9 @@ public class Person implements Serializable { /** Value for name. */ private String name; + /** Value for salary. */ + private Integer salary; + /** * Empty constructor. */ @@ -50,11 +53,13 @@ public class Person implements Serializable { public Person( Integer id, Integer orgId, - String name + String name, + Integer salary ) { this.id = id; this.orgId = orgId; this.name = name; + this.salary = salary; } /** @@ -111,6 +116,25 @@ public class Person implements Serializable { this.name = name; } + + /** + * Gets salary. + * + * @return Value for salary. + */ + public Integer getSalary() { + return salary; + } + + /** + * Sets salary. + * + * @param salary New value for salary. + */ + public void setSalary(Integer salary) { + this.salary = salary; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java new file mode 100644 index 0000000..e5e6d72 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final int GRID_CNT = 5; + + /** */ + private static final int KEY_RANGE = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (gridName.equals(getTestGridName(GRID_CNT - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(GRID_CNT - 1); + + startGrid(GRID_CNT - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxOperations() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperations() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsPrimarySync() throws Exception { + txOperations(PARTITIONED, PRIMARY_SYNC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void _testCrossCacheTxOperationsFairAffinity() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicated() throws Exception { + txOperations(REPLICATED, FULL_SYNC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception { + txOperations(REPLICATED, PRIMARY_SYNC, true, false); + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, + CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(writeSync); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + ccfg.setAffinity(fairAff ? new FairAffinityFunction() : new RendezvousAffinityFunction()); + + return ccfg; + } + + /** + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param crossCacheTx If {@code true} uses cross cache transaction. + * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. + * @throws Exception If failed. + */ + private void txOperations(CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean crossCacheTx, + boolean fairAff) throws Exception { + Ignite ignite = ignite(0); + + try { + ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff)); + ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff)); + + txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, false); + txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, true); + + txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false); + txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true); + } + finally { + ignite.destroyCache(CACHE1); + ignite.destroyCache(CACHE2); + } + } + + /** + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param crossCacheTx If {@code true} uses cross cache transaction. + * @param client If {@code true} uses client node. + * @throws Exception If failed. + */ + private void txOperations(TransactionConcurrency concurrency, + TransactionIsolation isolation, + boolean crossCacheTx, + boolean client) throws Exception { + final Map<TestKey, TestValue> expData1 = new HashMap<>(); + final Map<TestKey, TestValue> expData2 = new HashMap<>(); + + Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0); + + assertEquals(client, (boolean)ignite.configuration().isClientMode()); + + final List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>(); + final List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>(); + + for (int i = 0; i < GRID_CNT; i++) { + caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1)); + caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2)); + } + + IgniteCache<TestKey, TestValue> cache1 = ignite.cache(CACHE1); + IgniteCache<TestKey, TestValue> cache2 = ignite.cache(CACHE2); + + assertNotNull(cache1); + assertNotNull(cache2); + assertNotSame(cache1, cache2); + + try { + Random rnd = new Random(); + + long seed = System.currentTimeMillis(); + + rnd.setSeed(seed); + + log.info("Test tx operations [concurrency=" + concurrency + + ", isolation=" + isolation + + ", client=" + client + + ", seed=" + seed + ']'); + + IgniteTransactions txs = ignite.transactions(); + + final List<TestKey> keys = new ArrayList<>(); + + for (int i = 0; i < KEY_RANGE; i++) + keys.add(new TestKey(i)); + + CacheConfiguration ccfg = cache1.getConfiguration(CacheConfiguration.class); + + boolean fullSync = ccfg.getWriteSynchronizationMode() == FULL_SYNC; + boolean optimistic = concurrency == OPTIMISTIC; + + boolean checkData = fullSync && !optimistic; + + for (int i = 0; i < 10_000; i++) { + if (i % 100 == 0) + log.info("Iteration: " + i); + + boolean rollback = i % 10 == 0; + + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cacheOperation(expData1, rnd, cache1, checkData, rollback); + + if (crossCacheTx) + cacheOperation(expData2, rnd, cache2, checkData, rollback); + + if (rollback) + tx.rollback(); + else + tx.commit(); + } + } + + if (fullSync) { + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + + cache1.removeAll(); + cache2.removeAll(); + + checkData(caches1, keys, new HashMap<TestKey, TestValue>()); + checkData(caches2, keys, new HashMap<TestKey, TestValue>()); + } + } + finally { + cache1.removeAll(); + cache2.removeAll(); + } + } + + /** + * @param caches Caches. + * @param keys Keys. + * @param expData Expected data. + */ + private void checkData(List<IgniteCache<TestKey, TestValue>> caches, + List<TestKey> keys, Map<TestKey, TestValue> expData) { + for (IgniteCache<TestKey, TestValue> cache : caches) { + for (TestKey key : keys) { + TestValue val = cache.get(key); + TestValue expVal = expData.get(key); + + assertEquals(expVal, val); + } + } + } + + /** + * @param expData Expected cache data. + * @param rnd Random. + * @param cache Cache. + * @param checkData If {@code true} checks data. + * @param willRollback {@code True} if will rollback transaction. + */ + private void cacheOperation( + Map<TestKey, TestValue> expData, + Random rnd, + IgniteCache<TestKey, TestValue> cache, + boolean checkData, + boolean willRollback) { + TestKey key = key(rnd); + TestValue val = new TestValue(rnd.nextLong()); + + switch (rnd.nextInt(8)) { + case 0: { + cache.put(key, val); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 1: { + TestValue oldVal = cache.getAndPut(key, val); + + TestValue expOld = expData.get(key); + + if (checkData) + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 2: { + boolean rmv = cache.remove(key); + + if (checkData) + assertEquals(expData.containsKey(key), rmv); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 3: { + TestValue oldVal = cache.getAndRemove(key); + + TestValue expOld = expData.get(key); + + if (checkData) + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.remove(key); + + break; + } + + case 4: { + boolean put = cache.putIfAbsent(key, val); + + boolean expPut = !expData.containsKey(key); + + if (checkData) + assertEquals(expPut, put); + + if (expPut && !willRollback) + expData.put(key, val); + + break; + } + + case 5: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value())); + TestValue expOld = expData.get(key); + + if (checkData) + assertEquals(expOld, oldVal); + + if (!willRollback) + expData.put(key, val); + + break; + } + + case 6: { + TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null)); + TestValue expOld = expData.get(key); + + if (checkData) + assertEquals(expOld, oldVal); + + break; + } + + case 7: { + TestValue oldVal = cache.get(key); + TestValue expOld = expData.get(key); + + if (checkData) + assertEquals(expOld, oldVal); + + break; + } + + default: + assert false; + } + } + + /** + * @param rnd Random. + * @return Key. + */ + private TestKey key(Random rnd) { + return new TestKey(rnd.nextInt(KEY_RANGE)); + } + + /** + * + */ + private static class TestKey implements Serializable { + /** */ + private long key; + + /** + * @param key Key. + */ + public TestKey(long key) { + this.key = key; + } + + /** + * @return Key. + */ + public long key() { + return key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey other = (TestKey)o; + + return key == other.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(key ^ (key >>> 32)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestKey.class, this); + } + } + + /** + * + */ + private static class TestValue implements Serializable { + /** */ + private long val; + + /** + * @param val Value. + */ + public TestValue(long val) { + this.val = val; + } + + /** + * @return Value. + */ + public long value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue other = (TestValue)o; + + return val == other.val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } + + /** + * + */ + private static class TestEntryProcessor implements CacheEntryProcessor<TestKey, TestValue, TestValue> { + /** */ + private Long val; + + /** + * @param val Value. + */ + public TestEntryProcessor(@Nullable Long val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public TestValue process(MutableEntry<TestKey, TestValue> e, Object... args) { + TestValue old = e.getValue(); + + if (val != null) + e.setValue(new TestValue(val)); + + return old; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java new file mode 100644 index 0000000..af87a7d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteAtomicCacheEntryProcessorNodeJoinTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * + */ +public class IgniteAtomicCacheEntryProcessorNodeJoinTest extends IgniteCacheEntryProcessorNodeJoinTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } +}
