http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 2221d3b..01ca9f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -85,6 +85,9 @@ public class GridCacheSharedContext<K, V> { /** Deployment manager. */ private GridCacheDeploymentManager<K, V> depMgr; + /** Affinity manager. */ + private CacheAffinitySharedManager affMgr; + /** Cache contexts map. */ private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap; @@ -101,6 +104,7 @@ public class GridCacheSharedContext<K, V> { * @param mvccMgr MVCC manager. * @param depMgr Deployment manager. * @param exchMgr Exchange manager. + * @param affMgr Affinity manager. * @param ioMgr IO manager. * @param jtaMgr JTA manager. * @param storeSesLsnrs Store session listeners. @@ -112,13 +116,14 @@ public class GridCacheSharedContext<K, V> { GridCacheMvccManager mvccMgr, GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, + CacheAffinitySharedManager<K, V> affMgr, GridCacheIoManager ioMgr, CacheJtaManagerAdapter jtaMgr, Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; - setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr); + setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, affMgr, ioMgr); this.storeSesLsnrs = storeSesLsnrs; @@ -162,6 +167,7 @@ public class GridCacheSharedContext<K, V> { mvccMgr, new GridCacheDeploymentManager<K, V>(), new GridCachePartitionExchangeManager<K, V>(), + affMgr, ioMgr); this.mgrs = mgrs; @@ -190,6 +196,7 @@ public class GridCacheSharedContext<K, V> { * @param mvccMgr MVCC manager. * @param depMgr Deployment manager. * @param exchMgr Exchange manager. + * @param affMgr Affinity manager. * @param ioMgr IO manager. * @param jtaMgr JTA manager. */ @@ -200,6 +207,7 @@ public class GridCacheSharedContext<K, V> { GridCacheMvccManager mvccMgr, GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, + CacheAffinitySharedManager affMgr, GridCacheIoManager ioMgr) { this.mvccMgr = add(mgrs, mvccMgr); this.verMgr = add(mgrs, verMgr); @@ -207,6 +215,7 @@ public class GridCacheSharedContext<K, V> { this.jtaMgr = add(mgrs, jtaMgr); this.depMgr = add(mgrs, depMgr); this.exchMgr = add(mgrs, exchMgr); + this.affMgr = add(mgrs, affMgr); this.ioMgr = add(mgrs, ioMgr); } @@ -366,6 +375,13 @@ public class GridCacheSharedContext<K, V> { } /** + * @return Affinity manager. + */ + public CacheAffinitySharedManager<K, V> affinity() { + return affMgr; + } + + /** * @return Lock order manager. */ public GridCacheVersionManager versions() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 4744580..98b1b59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -17,10 +17,8 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; -import java.io.ObjectOutput; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,14 +26,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.Cache; import javax.cache.CacheException; @@ -72,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.lang.IgniteInClosureX; @@ -101,7 +96,6 @@ import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -170,14 +164,6 @@ public class GridCacheUtils { } }; - /** */ - private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY = - new C1<Integer, GridCacheVersion[]>() { - @Override public GridCacheVersion[] apply(Integer size) { - return new GridCacheVersion[size]; - } - }; - /** Empty predicate array. */ private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0]; @@ -302,46 +288,6 @@ public class GridCacheUtils { } /** - * Writes {@link GridCacheVersion} to output stream. This method is meant to be used by - * implementations of {@link Externalizable} interface. - * - * @param out Output stream. - * @param ver Version to write. - * @throws IOException If write failed. - */ - public static void writeVersion(ObjectOutput out, GridCacheVersion ver) throws IOException { - // Write null flag. - out.writeBoolean(ver == null); - - if (ver != null) { - out.writeBoolean(ver instanceof GridCacheVersionEx); - - ver.writeExternal(out); - } - } - - /** - * Reads {@link GridCacheVersion} from input stream. This method is meant to be used by - * implementations of {@link Externalizable} interface. - * - * @param in Input stream. - * @return Read version. - * @throws IOException If read failed. - */ - @Nullable public static GridCacheVersion readVersion(ObjectInput in) throws IOException { - // If UUID is not null. - if (!in.readBoolean()) { - GridCacheVersion ver = in.readBoolean() ? new GridCacheVersionEx() : new GridCacheVersion(); - - ver.readExternal(in); - - return ver; - } - - return null; - } - - /** * @param ctx Cache context. * @param meta Meta name. * @return Filter for entries with meta. @@ -837,21 +783,6 @@ public class GridCacheUtils { /** * @param nodes Nodes. - * @param locId Local node ID. - * @return Local node if it is in the list of nodes, or primary node. - */ - public static ClusterNode localOrPrimary(Iterable<ClusterNode> nodes, UUID locId) { - assert !F.isEmpty(nodes); - - for (ClusterNode n : nodes) - if (n.id().equals(locId)) - return n; - - return F.first(nodes); - } - - /** - * @param nodes Nodes. * @return Backup nodes. */ public static Collection<ClusterNode> backups(Collection<ClusterNode> nodes) { @@ -862,38 +793,6 @@ public class GridCacheUtils { } /** - * @param mappings Mappings. - * @param k map key. - * @return Either current list value or newly created one. - */ - public static <K, V> Collection<V> getOrSet(Map<K, List<V>> mappings, K k) { - List<V> vals = mappings.get(k); - - if (vals == null) - mappings.put(k, vals = new LinkedList<>()); - - return vals; - } - - /** - * @param mappings Mappings. - * @param k map key. - * @return Either current list value or newly created one. - */ - public static <K, V> Collection<V> getOrSet(ConcurrentMap<K, Collection<V>> mappings, K k) { - Collection<V> vals = mappings.get(k); - - if (vals == null) { - Collection<V> old = mappings.putIfAbsent(k, vals = new ConcurrentLinkedDeque8<>()); - - if (old != null) - vals = old; - } - - return vals; - } - - /** * @param log Logger. * @param excl Excludes. * @return Future listener that logs errors. @@ -923,20 +822,6 @@ public class GridCacheUtils { /** * @param t Exception to check. - * @return {@code true} if caused by lock timeout. - */ - public static boolean isLockTimeout(Throwable t) { - if (t == null) - return false; - - while (t instanceof IgniteCheckedException || t instanceof IgniteException) - t = t.getCause(); - - return t instanceof GridCacheLockTimeoutException; - } - - /** - * @param t Exception to check. * @return {@code true} if caused by lock timeout or cancellation. */ public static boolean isLockTimeoutOrCancelled(Throwable t) { @@ -1129,13 +1014,6 @@ public class GridCacheUtils { } /** - * @return Version array factory. - */ - public static IgniteClosure<Integer, GridCacheVersion[]> versionArrayFactory() { - return VER_ARR_FACTORY; - } - - /** * Converts cache version to byte array. * * @param ver Version. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index a9f4538..240fe7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1773,6 +1773,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalCache<K, V> withExpiryPolicy(ExpiryPolicy plc); /** + * @return Cache with no-retries behavior enabled. + */ + public IgniteInternalCache<K, V> withNoRetries(); + + /** * @param key Key. * @param entryProcessor Entry processor. * @param args Arguments. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 6567141..9e85bad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -207,7 +207,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { ClusterNode primary = cctx.affinity().primary(key, topVer); if (primary == null) - throw new IgniteException("Failed to get primare node [topVer=" + topVer + ", key=" + key + ']'); + throw new IgniteException("Failed to get primary node [topVer=" + topVer + ", key=" + key + ']'); Collection<K> mapped = res.get(primary); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index b42e5e7..c018f71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -230,7 +230,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { if (create) { hdr = new GridCacheQueueHeader(IgniteUuid.randomUuid(), cap, colloc, 0, 0, null); - GridCacheQueueHeader old = queueHdrView.getAndPutIfAbsent(key, hdr); + GridCacheQueueHeader old = queueHdrView.withNoRetries().getAndPutIfAbsent(key, hdr); if (old != null) { if (old.capacity() != cap || old.collocated() != colloc) @@ -385,7 +385,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { GridCacheSetHeader hdr; - GridCacheAdapter cache = cctx.cache(); + IgniteInternalCache cache = cctx.cache().withNoRetries(); if (create) { hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java index 46d113b..875ada0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; * Committed transaction information. Contains recovery writes that will be used to set commit values * in case if originating node crashes. */ +@Deprecated public class GridCacheCommittedTxInfo implements Externalizable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 1709b0f..6e97ec5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -299,7 +299,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Object arg) throws IgniteException { - Map<ComputeJob, ClusterNode> jobs = new HashMap(); + Map<ComputeJob, ClusterNode> jobs = new HashMap<>(); for (ClusterNode node : subgrid) jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index a3eaba4..84a4094 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -156,14 +156,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { } /** - * @param committedVers Committed versions relative to lock version. - * @param rolledbackVers Rolled back versions relative to lock version. - */ - public void setCandidates(Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - completedVersions(committedVers, rolledbackVers); - } - - /** * @param val Value. */ public void addValue(CacheObject val) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 262d959..2cf7276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -315,7 +314,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']'); // Replace the entry. - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion())); } } } @@ -439,7 +438,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Got removed entry while committing (will retry): " + txEntry); - txEntry.cached(txEntry.context().cache().entryEx(txEntry.key())); + txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion())); } } } @@ -469,7 +468,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter GridCacheEntryEx cached = txEntry.cached(); if (cached == null) - txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); if (near() && cacheCtx.dr().receiveEnabled()) { cached.markObsolete(xidVer); @@ -662,7 +661,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter log.debug("Attempting to commit a removed entry (will retry): " + txEntry); // Renew cached entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 28c94dd..4381dfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -30,10 +29,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index ad4943e..3761d77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -219,7 +219,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { + @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean initParts) + throws IgniteCheckedException { ClusterNode loc = cctx.localNode(); U.writeLock(lock); @@ -777,7 +783,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer, Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); - map.updateSequence(updateSeq); + map.updateSequence(updateSeq, topVer); map.put(p, state); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 82450ad..e883614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -18,10 +18,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.UUID; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -51,6 +55,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** Affinity assignment bytes. */ private byte[] affAssignmentBytes; + /** */ + @GridDirectTransient + private List<List<UUID>> affAssignmentIds; + + /** */ + private byte[] affAssignmentIdsBytes; + + /** */ + @GridDirectTransient + private List<List<UUID>> idealAffAssignment; + + /** Affinity assignment bytes. */ + private byte[] idealAffAssignmentBytes; + /** * Empty constructor. */ @@ -62,12 +80,19 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { * @param cacheId Cache ID. * @param topVer Topology version. * @param affAssignment Affinity assignment. + * @param sndNodeIds If {@code true} sends only node IDs instead of nodes. */ - public GridDhtAffinityAssignmentResponse(int cacheId, @NotNull AffinityTopologyVersion topVer, - List<List<ClusterNode>> affAssignment) { + public GridDhtAffinityAssignmentResponse(int cacheId, + @NotNull AffinityTopologyVersion topVer, + List<List<ClusterNode>> affAssignment, + boolean sndNodeIds) { this.cacheId = cacheId; this.topVer = topVer; - this.affAssignment = affAssignment; + + if (!sndNodeIds) + this.affAssignment = affAssignment; + else + affAssignmentIds = ids(affAssignment); } /** {@inheritDoc} */ @@ -83,12 +108,86 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { } /** + * @param disco Discovery manager. * @return Affinity assignment. */ - public List<List<ClusterNode>> affinityAssignment() { + public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) { + if (affAssignment != null) + return affAssignment; + + if (affAssignmentIds != null) + affAssignment = nodes(disco, affAssignmentIds); + return affAssignment; } + /** + * @return Ideal affinity assignment. + */ + public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) { + return nodes(disco, idealAffAssignment); + } + + /** + * @param disco Discovery manager. + * @param assignmentIds Assignment node IDs. + * @return Assignment nodes. + */ + private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) { + if (assignmentIds != null) { + List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size()); + + for (int i = 0; i < assignmentIds.size(); i++) { + List<UUID> ids = assignmentIds.get(i); + List<ClusterNode> nodes = new ArrayList<>(ids.size()); + + for (int j = 0; j < ids.size(); j++) { + ClusterNode node = disco.node(topVer, ids.get(j)); + + assert node != null; + + nodes.add(node); + } + + assignment.add(nodes); + } + + return assignment; + } + + return null; + } + + /** + * @param idealAffAssignment Ideal affinity assignment. + */ + public void idealAffinityAssignment(List<List<ClusterNode>> idealAffAssignment) { + this.idealAffAssignment = ids(idealAffAssignment); + } + + /** + * @param assignments Assignment. + */ + private List<List<UUID>> ids(List<List<ClusterNode>> assignments) { + if (assignments != null) { + List<List<UUID>> assignment = new ArrayList<>(assignments.size()); + + for (int i = 0; i < assignments.size(); i++) { + List<ClusterNode> nodes = assignments.get(i); + List<UUID> ids = new ArrayList<>(nodes.size()); + + for (int j = 0; j < nodes.size(); j++) + ids.add(nodes.get(j).id()); + + assignment.add(ids); + } + + return assignment; + } + + return null; + } + /** {@inheritDoc} */ @Override public byte directType() { return 29; @@ -96,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 7; } /** @@ -105,34 +204,69 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); + assert affAssignment != null ^ affAssignmentIds != null; + if (affAssignment != null && affAssignmentBytes == null) affAssignmentBytes = ctx.marshaller().marshal(affAssignment); + + if (affAssignmentIds != null && affAssignmentIdsBytes == null) + affAssignmentIdsBytes = ctx.marshaller().marshal(affAssignmentIds); + + if (idealAffAssignment != null && idealAffAssignmentBytes == null) + idealAffAssignmentBytes = ctx.marshaller().marshal(idealAffAssignment); } /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (affAssignmentBytes != null && affAssignment == null) { - affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + assert affAssignmentBytes != null ^ affAssignmentIdsBytes != null; - // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented. - int assignments = affAssignment.size(); + ldr = U.resolveClassLoader(ldr, ctx.gridConfig()); - for (int n = 0; n < assignments; n++) { - List<ClusterNode> nodes = affAssignment.get(n); + if (affAssignmentBytes != null && affAssignment == null) + affAssignment = unmarshalNodes(affAssignmentBytes, ctx, ldr); - int size = nodes.size(); + if (affAssignmentIdsBytes != null && affAssignmentIds == null) + affAssignmentIds = ctx.marshaller().unmarshal(affAssignmentIdsBytes, ldr); - for (int i = 0; i < size; i++) { - ClusterNode node = nodes.get(i); + if (idealAffAssignmentBytes != null && idealAffAssignment == null) + idealAffAssignment = ctx.marshaller().unmarshal(idealAffAssignmentBytes, ldr); + } - if (node instanceof TcpDiscoveryNode) - ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId())); - } + /** + * @param bytes Assignment bytes. + * @param ctx Context. + * @param ldr Class loader. + * @return Assignment. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private List<List<ClusterNode>> unmarshalNodes(byte[] bytes, + GridCacheSharedContext ctx, + ClassLoader ldr) + throws IgniteCheckedException + { + List<List<ClusterNode>> affAssignment = ctx.marshaller().unmarshal(bytes, + U.resolveClassLoader(ldr, ctx.gridConfig())); + + // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented. + int assignments = affAssignment.size(); + + for (int n = 0; n < assignments; n++) { + List<ClusterNode> nodes = affAssignment.get(n); + + int size = nodes.size(); + + for (int i = 0; i < size; i++) { + ClusterNode node = nodes.get(i); + + if (node instanceof TcpDiscoveryNode) + ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId())); } } + + return affAssignment; } /** {@inheritDoc} */ @@ -162,6 +296,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { writer.incrementState(); case 4: + if (!writer.writeByteArray("affAssignmentIdsBytes", affAssignmentIdsBytes)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("topVer", topVer)) return false; @@ -192,6 +338,22 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage { reader.incrementState(); case 4: + affAssignmentIdsBytes = reader.readByteArray("affAssignmentIdsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index bb9f4ab..ab8e863 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -30,9 +30,12 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -41,7 +44,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF /** * Future that fetches affinity assignment from remote cache nodes. */ -public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<ClusterNode>>> { +public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> { /** */ private static final long serialVersionUID = 0L; @@ -51,34 +54,39 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl /** Logger. */ private static IgniteLogger log; - /** Cache context. */ - private final GridCacheContext ctx; + /** */ + private final GridCacheSharedContext ctx; /** List of available nodes this future can fetch data from. */ + @GridToStringInclude private Queue<ClusterNode> availableNodes; - /** Topology version. */ - private final AffinityTopologyVersion topVer; - /** Pending node from which response is being awaited. */ private ClusterNode pendingNode; + /** */ + @GridToStringInclude + private final T2<Integer, AffinityTopologyVersion> key; + /** - * @param ctx Cache context. - * @param availableNodes Available nodes. + * @param ctx Context. + * @param cacheName Cache name. + * @param topVer Topology version. */ public GridDhtAssignmentFetchFuture( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - Collection<ClusterNode> availableNodes + GridCacheSharedContext ctx, + String cacheName, + AffinityTopologyVersion topVer ) { this.ctx = ctx; - this.topVer = topVer; + this.key = new T2<>(CU.cacheId(cacheName), topVer); + + Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); LinkedList<ClusterNode> tmp = new LinkedList<>(); for (ClusterNode node : availableNodes) { - if (!node.isLocal()) + if (!node.isLocal() && ctx.discovery().alive(node)) tmp.add(node); } @@ -94,33 +102,40 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl * Initializes fetch future. */ public void init() { - ((GridDhtPreloader)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this); + ctx.affinity().addDhtAssignmentFetchFuture(this); requestFromNextNode(); } /** - * @param node Node. + * @return Future key. + */ + public T2<Integer, AffinityTopologyVersion> key() { + return key; + } + + /** + * @param nodeId Node ID. * @param res Response. */ - public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) { - if (!res.topologyVersion().equals(topVer)) { + public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) { + if (!res.topologyVersion().equals(key.get2())) { if (log.isDebugEnabled()) log.debug("Received affinity assignment for wrong topology version (will ignore) " + - "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); + "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']'); return; } - List<List<ClusterNode>> assignment = null; + GridDhtAffinityAssignmentResponse res0 = null; synchronized (this) { - if (pendingNode != null && pendingNode.equals(node)) - assignment = res.affinityAssignment(); + if (pendingNode != null && pendingNode.id().equals(nodeId)) + res0 = res; } - if (assignment != null) - onDone(assignment); + if (res0 != null) + onDone(res); } /** @@ -139,9 +154,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, @Nullable Throwable err) { + @Override public boolean onDone(@Nullable GridDhtAffinityAssignmentResponse res, @Nullable Throwable err) { if (super.onDone(res, err)) { - ((GridDhtPreloader)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this); + ctx.affinity().removeDhtAssignmentFetchFuture(this); return true; } @@ -167,7 +182,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() + ", node=" + node + ']'); - ctx.io().send(node, new GridDhtAffinityAssignmentRequest(ctx.cacheId(), topVer), + ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()), AFFINITY_POOL); // Close window for listener notification. @@ -198,6 +213,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl // No more nodes left, complete future with null outside of synchronization. // Affinity should be calculated from scratch. if (complete) - onDone((List<List<ClusterNode>>)null); + onDone((GridDhtAffinityAssignmentResponse)null); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAssignmentFetchFuture.class, this); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index ee9525a..faa980e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -35,6 +35,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; @@ -427,31 +428,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** - * Gets or creates entry for given key. If key belongs to local node, dht entry will be returned, otherwise - * if {@code allowDetached} is {@code true}, detached entry will be returned, otherwise exception will be - * thrown. - * - * @param key Key for which entry should be returned. - * @param allowDetached Whether to allow detached entries. - * @param touch {@code True} if entry should be passed to eviction policy. - * @return Cache entry. - * @throws GridDhtInvalidPartitionException if entry does not belong to this node and - * {@code allowDetached} is {@code false}. - */ - public GridCacheEntryEx entryExx(KeyCacheObject key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { - try { - return allowDetached && !ctx.affinity().localNode(key, topVer) ? - createEntry(key) : entryEx(key, touch); - } - catch (GridDhtInvalidPartitionException e) { - if (!allowDetached) - throw e; - - return createEntry(key); - } - } - - /** * @param key Key for which entry should be returned. * @return Cache entry. */ @@ -934,7 +910,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); + AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) { List<ClusterNode> nodes = ctx.affinity().nodes(e.getKey(), topVer); @@ -984,7 +960,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy()); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send TTL update request.", e); + if (e instanceof ClusterTopologyCheckedException) { + if (log.isDebugEnabled()) + log.debug("Failed to send TTC update request, node left: " + req.getKey()); + } + else + U.error(log, "Failed to send TTL update request.", e); } } } @@ -1050,6 +1031,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap if (log.isDebugEnabled()) log.debug("Got removed entry: " + entry); } + catch (GridDhtInvalidPartitionException e) { + if (log.isDebugEnabled()) + log.debug("Got GridDhtInvalidPartitionException: " + e); + + break; + } } } finally { @@ -1153,7 +1140,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** {@inheritDoc} */ @Override public int size() { GridDhtLocalPartition part = ctx.topology().localPartition(partId, - new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); + ctx.discovery().topologyVersionEx(), false); return part != null ? part.publicSize() : 0; } @@ -1201,7 +1188,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); - return !cacheNodes0.equals(cacheNodes1); + if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) + return true; + + try { + List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer); + List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer); + + return !aff1.equals(aff2); + } + catch (IllegalStateException e) { + return true; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index fa753b0..fbfca82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -406,6 +406,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { fut = tx.getAllAsync(cctx, + null, keys.keySet(), /*deserialize binary*/false, skipVals, @@ -437,6 +438,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { return tx.getAllAsync(cctx, + null, keys.keySet(), /*deserialize binary*/false, skipVals, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index d9851c7..2de92b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -356,6 +356,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa } else { fut = tx.getAllAsync(cctx, + null, Collections.singleton(key), /*deserialize binary*/false, skipVals, @@ -390,6 +391,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa } else { fut0 = tx.getAllAsync(cctx, + null, Collections.singleton(key), /*deserialize binary*/false, skipVals, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index a33f01f..a3e94a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -303,13 +303,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> return futId; } - /** - * @return Near lock version. - */ - public GridCacheVersion nearLockVersion() { - return nearLockVer; - } - /** {@inheritDoc} */ @Nullable @Override public GridCacheVersion mappedVersion() { return tx == null ? nearLockVer : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 84889f8..7fba45d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -87,9 +87,17 @@ public interface GridDhtPartitionTopology { * Pre-initializes this topology. * * @param exchFut Exchange future. + * @param affReady Affinity ready flag. * @throws IgniteCheckedException If failed. */ - public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException; + public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady) + throws IgniteCheckedException; + + /** + * @param exchFut Exchange future. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException; /** * Post-initializes this topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index b3786cd..f0ce6d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -267,13 +267,144 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { - waitForRent(); + @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { + U.writeLock(lock); + + try { + if (stopping) + return; + + long updateSeq = this.updateSeq.incrementAndGet(); + + initPartitions0(exchFut, updateSeq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + } + /** + * @param exchFut Exchange future. + * @param updateSeq Update sequence. + */ + private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { ClusterNode loc = cctx.localNode(); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null || cctx.kernalContext().clientNode(); + + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + + assert topVer.equals(exchFut.topologyVersion()) : + "Invalid topology [topVer=" + topVer + + ", cache=" + cctx.name() + + ", futVer=" + exchFut.topologyVersion() + ']'; + assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() + + ", cache=" + cctx.name()+ + ", futVer=" + exchFut.topologyVersion() + ']'; + + List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion()); + int num = cctx.affinity().partitions(); + if (cctx.rebalanceEnabled()) { + boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()); + + boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added; + + if (first) { + assert exchId.isJoined() || added; + + for (int p = 0; p < num; p++) { + if (localNode(p, aff)) { + GridDhtLocalPartition locPart = createPartition(p); + + boolean owned = locPart.own(); + + assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + + ", part=" + locPart + ']'; + + if (log.isDebugEnabled()) + log.debug("Owned partition for oldest node: " + locPart); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + } + } + } + else + createPartitions(aff, updateSeq); + } + else { + // If preloader is disabled, then we simply clear out + // the partitions this node is not responsible for. + for (int p = 0; p < num; p++) { + GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); + + boolean belongs = localNode(p, aff); + + if (locPart != null) { + if (!belongs) { + GridDhtPartitionState state = locPart.state(); + + if (state.active()) { + locPart.rent(false); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Evicting partition with rebalancing disabled " + + "(it does not belong to affinity): " + locPart); + } + } + } + else if (belongs) + createPartition(p); + } + } + + if (node2part != null && node2part.valid()) + checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } + + /** + * @param aff Affinity assignments. + * @param updateSeq Update sequence. + */ + private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { + ClusterNode loc = cctx.localNode(); + + int num = cctx.affinity().partitions(); + + for (int p = 0; p < num; p++) { + if (node2part != null && node2part.valid()) { + if (localNode(p, aff)) { + // This will make sure that all non-existing partitions + // will be created in MOVING state. + GridDhtLocalPartition locPart = createPartition(p); + + updateLocal(p, loc.id(), locPart.state(), updateSeq); + } + } + // If this node's map is empty, we pre-create local partitions, + // so local map will be sent correctly during exchange. + else if (localNode(p, aff)) + createPartition(p); + } + } + + /** {@inheritDoc} */ + @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady) + throws IgniteCheckedException { + waitForRent(); + + ClusterNode loc = cctx.localNode(); + U.writeLock(lock); try { @@ -291,7 +422,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // In case if node joins, get topology at the time of joining node. ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null; + assert oldest != null || cctx.kernalContext().clientNode(); if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -301,7 +432,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { cntrMap.clear(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { + if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -325,110 +456,14 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (cctx.rebalanceEnabled()) { - for (int p = 0; p < num; p++) { - // If this is the first node in grid. - boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()); - - if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) { - assert exchId.isJoined() || added; - - try { - GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); - - assert locPart != null; - - boolean owned = locPart.own(); - - assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + - ", part=" + locPart + ']'; - - if (log.isDebugEnabled()) - log.debug("Owned partition for oldest node: " + locPart); - - updateLocal(p, loc.id(), locPart.state(), updateSeq); - } - catch (GridDhtInvalidPartitionException e) { - if (log.isDebugEnabled()) - log.debug("Ignoring invalid partition on oldest node (no need to create a partition " + - "if it no longer belongs to local node: " + e.partition()); - } - } - // If this is not the first node in grid. - else { - if (node2part != null && node2part.valid()) { - if (cctx.affinity().localNode(p, topVer)) { - try { - // This will make sure that all non-existing partitions - // will be created in MOVING state. - GridDhtLocalPartition locPart = localPartition(p, topVer, true, false); - - updateLocal(p, loc.id(), locPart.state(), updateSeq); - } - catch (GridDhtInvalidPartitionException e) { - if (log.isDebugEnabled()) - log.debug("Ignoring invalid partition (no need to create a partition if it " + - "no longer belongs to local node: " + e.partition()); - } - } - } - // If this node's map is empty, we pre-create local partitions, - // so local map will be sent correctly during exchange. - else if (cctx.affinity().localNode(p, topVer)) { - try { - localPartition(p, topVer, true, false); - } - catch (GridDhtInvalidPartitionException e) { - if (log.isDebugEnabled()) - log.debug("Ignoring invalid partition (no need to pre-create a partition if it " + - "no longer belongs to local node: " + e.partition()); - } - } - } - } - } + if (affReady) + initPartitions0(exchFut, updateSeq); else { - // If preloader is disabled, then we simply clear out - // the partitions this node is not responsible for. - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - - boolean belongs = cctx.affinity().localNode(p, topVer); + List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); - if (locPart != null) { - if (!belongs) { - GridDhtPartitionState state = locPart.state(); - - if (state.active()) { - locPart.rent(false); - - updateLocal(p, loc.id(), locPart.state(), updateSeq); - - if (log.isDebugEnabled()) - log.debug("Evicting partition with rebalancing disabled " + - "(it does not belong to affinity): " + locPart); - } - } - } - else if (belongs) { - try { - // Pre-create partitions. - localPartition(p, topVer, true, false); - } - catch (GridDhtInvalidPartitionException e) { - if (log.isDebugEnabled()) - log.debug("Ignoring invalid partition with disabled rebalancer (no need to " + - "pre-create a partition if it no longer belongs to local node: " + e.partition()); - } - } - } + createPartitions(aff, updateSeq); } - if (node2part != null && node2part.valid()) - checkEvictions(updateSeq); - - updateRebalanceVersion(); - consistencyCheck(); if (log.isDebugEnabled()) @@ -453,6 +488,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer = exchFut.topologyVersion(); + assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " + + "[topVer=" + topVer + + ", affVer=" + cctx.affinity().affinityTopologyVersion() + + ", fut=" + exchFut + ']'; + lock.writeLock().lock(); try { @@ -535,7 +575,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - updateRebalanceVersion(); + updateRebalanceVersion(cctx.affinity().assignments(topVer)); consistencyCheck(); } @@ -554,6 +594,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param p Partition number. + * @return Partition. + */ + private GridDhtLocalPartition createPartition(int p) { + GridDhtLocalPartition loc = locParts.get(p); + + if (loc != null && loc.state() == EVICTED) { + boolean rmv = locParts.remove(p, loc); + + assert rmv; + + loc = null; + } + + if (loc == null) { + GridDhtLocalPartition old = locParts.putIfAbsent(p, loc = new GridDhtLocalPartition(cctx, p)); + + assert old == null : old; + } + + return loc; + } + + /** + * @param p Partition number. * @param topVer Topology version. * @param create Create flag. * @param updateSeq Update sequence. @@ -714,6 +778,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + ", topVer2=" + this.topVer + + ", node=" + cctx.gridName() + ", cache=" + cctx.name() + ", node2part=" + node2part + ']'; @@ -959,9 +1024,17 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part2node = p2n; - boolean changed = checkEvictions(updateSeq); + boolean changed = false; + + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); - updateRebalanceVersion(); + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + + changed = checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } consistencyCheck(); @@ -978,7 +1051,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap2 parts, @Nullable Map<Integer, Long> cntrMap) { + GridDhtPartitionMap2 parts, + @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -1072,9 +1146,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - changed |= checkEvictions(updateSeq); + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { + List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); - updateRebalanceVersion(); + changed |= checkEvictions(updateSeq, aff); + + updateRebalanceVersion(aff); + } consistencyCheck(); @@ -1090,9 +1170,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param updateSeq Update sequence. + * @param aff Affinity assignments. * @return Checks if any of the local partitions need to be evicted. */ - private boolean checkEvictions(long updateSeq) { + private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { boolean changed = false; UUID locId = cctx.nodeId(); @@ -1103,7 +1184,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { int p = part.id(); - List<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + List<ClusterNode> affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING)); @@ -1172,10 +1253,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // In case if node joins, get topology at the time of joining node. ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); - assert oldest != null; + assert oldest != null || cctx.kernalContext().clientNode(); // If this node became the oldest node. - if (oldest.id().equals(cctx.nodeId())) { + if (cctx.localNode().equals(oldest)) { long seq = node2part.updateSequence(); if (seq != updateSeq) { @@ -1203,7 +1284,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer, Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); - map.updateSequence(updateSeq); + map.updateSequence(updateSeq, topVer); map.put(p, state); @@ -1221,7 +1302,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private void removeNode(UUID nodeId) { assert nodeId != null; - ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer)); assert oldest != null; @@ -1357,15 +1438,24 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** - * + * @param part Partition. + * @param aff Affinity assignments. + * @return {@code True} if given partition belongs to local node. + */ + private boolean localNode(int part, List<List<ClusterNode>> aff) { + return aff.get(part).contains(cctx.localNode()); + } + + /** + * @param aff Affinity assignments. */ - private void updateRebalanceVersion() { + private void updateRebalanceVersion(List<List<ClusterNode>> aff) { if (!rebalancedTopVer.equals(topVer)) { if (node2part == null || !node2part.valid()) return; for (int i = 0; i < cctx.affinity().partitions(); i++) { - List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer); + List<ClusterNode> affNodes = aff.get(i); // Topology doesn't contain server nodes (just clients). if (affNodes.isEmpty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index ae24ed1..b6639f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -867,6 +867,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach tx = new GridDhtTxLocal( ctx.shared(), + req.topologyVersion(), nearNode.id(), req.version(), req.futureId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 8c295ce..b9afbed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -126,7 +126,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur MiniFuture f = (MiniFuture)fut; if (f.node().id().equals(nodeId)) { - f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId)); + f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId), true); return true; } @@ -327,7 +327,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onResult((ClusterTopologyCheckedException)e); + fut.onNodeLeft((ClusterTopologyCheckedException)e); else fut.onResult(e); } @@ -413,7 +413,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onResult((ClusterTopologyCheckedException)e); + fut.onNodeLeft((ClusterTopologyCheckedException)e); else fut.onResult(e); } @@ -467,7 +467,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onResult((ClusterTopologyCheckedException)e); + fut.onNodeLeft((ClusterTopologyCheckedException)e); else fut.onResult(e); } @@ -563,7 +563,15 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * @param e Node failure. */ - void onResult(ClusterTopologyCheckedException e) { + void onNodeLeft(ClusterTopologyCheckedException e) { + onNodeLeft(e, false); + } + + /** + * @param e Node failure. + * @param discoThread {@code True} if executed from discovery thread. + */ + void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) { if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index ebf1002..acd5017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -118,6 +118,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa */ public GridDhtTxLocal( GridCacheSharedContext cctx, + AffinityTopologyVersion topVer, UUID nearNodeId, GridCacheVersion nearXidVer, IgniteUuid nearFutId, @@ -157,7 +158,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa subjId, taskNameHash); - assert cctx != null; assert nearNodeId != null; assert nearFutId != null; assert nearMiniId != null; @@ -174,6 +174,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert !F.eq(xidVer, nearXidVer); initResult(); + + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + + topologyVersion(topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 534a560..34ba87b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -283,7 +283,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { continue; if (e.cached().obsolete()) { - GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key()); + GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion()); e.cached(cached); } @@ -312,7 +312,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { break; } catch (GridCacheEntryRemovedException ignore) { - GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key()); + GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion()); e.cached(cached); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 445c70a..0541c8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -549,7 +549,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); if (entry == null) { - entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); + entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()); txEntry.cached(entry); } @@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (log.isDebugEnabled()) log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry); - entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key()); + entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()); txEntry.cached(entry); } @@ -817,7 +817,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion())); } } } @@ -847,7 +847,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } catch (GridCacheEntryRemovedException ignored) { // Retry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key())); + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion())); } } } @@ -1317,7 +1317,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter break; } catch (GridCacheEntryRemovedException ignore) { - cached = dht.entryExx(entry.key()); + cached = dht.entryExx(entry.key(), tx.topologyVersion()); entry.cached(cached); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 343515d..f509e27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -140,6 +140,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + topologyVersion(topVer); } @@ -207,6 +209,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + topologyVersion(topVer); }
