IGNITE-3477 - Remove waitForRent() in exchange future
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e956ed0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e956ed0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e956ed0 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: 7e956ed07c70c6d4de0ccb60a222529ecef3ea9b Parents: e4b05d5 Author: Alexey Goncharuk <[email protected]> Authored: Thu Mar 9 15:09:42 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Mar 9 15:09:42 2017 +0300 ---------------------------------------------------------------------- .../pagemem/wal/IgniteWriteAheadLogManager.java | 27 ++- .../GridCachePartitionExchangeManager.java | 27 ++- .../cache/IgniteCacheOffheapManager.java | 3 +- .../cache/IgniteCacheOffheapManagerImpl.java | 57 +++-- .../IgniteCacheDatabaseSharedManager.java | 44 +++- .../dht/GridClientPartitionTopology.java | 33 ++- .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../distributed/dht/GridDhtLocalPartition.java | 60 +++--- .../distributed/dht/GridDhtLockFuture.java | 29 ++- .../dht/GridDhtPartitionTopology.java | 22 +- .../dht/GridDhtPartitionTopologyImpl.java | 215 +++++++++---------- .../distributed/dht/GridDhtTxLocalAdapter.java | 2 +- .../GridDhtPartitionDemandMessage.java | 73 +++++-- .../dht/preloader/GridDhtPartitionSupplier.java | 10 +- .../GridDhtPartitionSupplyMessageV2.java | 1 - .../GridDhtPartitionsExchangeFuture.java | 153 +++++++++++-- .../preloader/GridDhtPartitionsFullMessage.java | 120 +++++++++-- .../GridDhtPartitionsSingleMessage.java | 77 ++++++- .../dht/preloader/GridDhtPreloader.java | 106 ++++++--- .../IgniteDhtPartitionCountersMap.java | 64 ++++++ .../IgniteDhtPartitionHistorySuppliersMap.java | 107 +++++++++ .../IgniteDhtPartitionsToReloadMap.java | 88 ++++++++ 22 files changed, 1043 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index fb7a39b..ac785b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -59,8 +59,9 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni * the underlying storage. * * @param ptr Optional pointer to sync. If {@code null}, will sync up to the latest record. - * @throws IgniteCheckedException If - * @throws StorageException + * @throws IgniteCheckedException If failed to fsync. + * @throws StorageException If IO exception occurred during the write. If an exception is thrown from this + * method, the WAL will be invalidated and the node will be stopped. */ public void fsync(WALPointer ptr) throws IgniteCheckedException, StorageException; @@ -75,6 +76,22 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public WALIterator replay(WALPointer start) throws IgniteCheckedException, StorageException; /** + * Invoke this method to reserve WAL history since provided pointer and prevent it's deletion. + * + * @param start WAL pointer. + * @throws IgniteException If failed to reserve. + */ + public boolean reserve(WALPointer start) throws IgniteCheckedException; + + /** + * Invoke this method to release WAL history since provided pointer that was previously reserved. + * + * @param start WAL pointer. + * @throws IgniteException If failed to release. + */ + public void release(WALPointer start) throws IgniteCheckedException; + + /** * Gives a hint to WAL manager to clear entries logged before the given pointer. Some entries before the * the given pointer will be kept because there is a configurable WAL history size. Those entries may be used * for partial partition rebalancing. @@ -83,4 +100,10 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni * @return Number of deleted WAL segments. */ public int truncate(WALPointer ptr); + + /** + * @param ptr Pointer. + * @return True if given pointer is located in reserved segment. + */ + public boolean reserved(WALPointer ptr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 232dc83..695872f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -73,6 +73,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -822,7 +824,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if message was sent, {@code false} if node left grid. */ private boolean sendAllPartitions(Collection<ClusterNode> nodes) { - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true); if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@ -854,12 +856,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return Message. */ public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes, - final @Nullable GridDhtPartitionExchangeId exchId, + @Nullable final GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer, + @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable IgniteDhtPartitionsToReloadMap partsToReload, final boolean compress) { final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, lastVer, - exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE); + exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE, + partHistSuppliers, + partsToReload + ); m.compress(compress); @@ -1239,11 +1246,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), cacheId)) != null; } if (!cctx.kernalContext().clientNode() && updated) refreshPartitions(); + + boolean hasMovingParts = false; + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal() && cacheCtx.started() && cacheCtx.topology().hasMovingPartitions()) { + hasMovingParts = true; + break; + } + } + + if (!hasMovingParts) + cctx.database().releaseHistoryForPreloading(); } else exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4b85abe..d1ab787 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -82,10 +82,9 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public Iterable<CacheDataStore> cacheDataStores(); /** - * @param p Partition ID. * @param store Data store. */ - public void destroyCacheDataStore(int p, CacheDataStore store) throws IgniteCheckedException; + public void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException; /** * TODO: GG-10884, used on only from initialValue. http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 05eaf0a..5d1f7b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.GridStripedLock; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridCursor; @@ -92,9 +93,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>(); /** */ - protected final CacheDataStore rmvdStore = new CacheDataStoreImpl(-1, null, null, null); - - /** */ protected PendingEntriesTree pendingEntries; /** */ @@ -112,6 +110,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** */ private int updateValSizeThreshold; + /** */ + private GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors()); + /** {@inheritDoc} */ @Override public GridAtomicLong globalRemoveId() { return globalRmvId; @@ -172,7 +173,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple super.stop0(cancel, destroy); if (destroy && cctx.affinityNode()) - destroyCacheDataStructures(destroy); + destroyCacheDataStructures(); } /** {@inheritDoc} */ @@ -185,7 +186,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * */ - protected void destroyCacheDataStructures(boolean destroy) { + protected void destroyCacheDataStructures() { assert cctx.affinityNode(); try { @@ -196,7 +197,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple pendingEntries.destroy(); for (CacheDataStore store : partDataStores.values()) - store.destroy(); + destroyCacheDataStore(store); } catch (IgniteCheckedException e) { throw new IgniteException(e.getMessage(), e); @@ -238,7 +239,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (cctx.isLocal()) return locCacheDataStore; else { - GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false); + GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false, true); return part != null ? part.dataStore() : null; } @@ -278,7 +279,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public long entriesCount(int part) { - if (cctx.isLocal()){ + if (cctx.isLocal()) { assert part == 0; return locCacheDataStore.size(); @@ -709,15 +710,20 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { - CacheDataStore dataStore = null; - CacheDataStore oldStore = null; + CacheDataStore dataStore; + + partStoreLock.lock(p); + + try { + assert !partDataStores.containsKey(p); - do { dataStore = createCacheDataStore0(p); - oldStore = partDataStores.putIfAbsent(p, dataStore); + partDataStores.put(p, dataStore); + } + finally { + partStoreLock.unlock(p); } - while (oldStore != null); return dataStore; } @@ -763,15 +769,32 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void destroyCacheDataStore(int p, CacheDataStore store) throws IgniteCheckedException { + @Override public final void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException { + int p = store.partId(); + + partStoreLock.lock(p); + try { - partDataStores.remove(p, store); + boolean removed = partDataStores.remove(p, store); + + assert removed; - store.destroy(); + destroyCacheDataStore0(store); } catch (IgniteCheckedException e) { throw new IgniteException(e); } + finally { + partStoreLock.unlock(p); + } + } + + /** + * @param store Cache data store. + * @throws IgniteCheckedException If failed. + */ + protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { + store.destroy(); } /** @@ -1155,7 +1178,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple KeyCacheObject key, CacheObject oldVal, CacheObject newVal - ) throws IgniteCheckedException { + ) { // In case we deal with IGFS cache, count updated data if (cctx.cache().isIgfsDataCache() && !cctx.isNear() && http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 95ed3ae..743cd77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.database; import java.io.File; import java.util.Collection; +import java.util.Collections; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; @@ -95,7 +96,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * @throws IgniteCheckedException If failed. */ - public void initDataBase() throws IgniteCheckedException{ + public void initDataBase() throws IgniteCheckedException { // No-op. } @@ -152,7 +153,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** * */ - public void unLock(){ + public void unLock() { } @@ -212,7 +213,8 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap * @return Snapshot creation init future or {@code null} if snapshot is not available. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture startLocalSnapshotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotMsg) + @Nullable public IgniteInternalFuture startLocalSnapshotOperation( + StartSnapshotOperationAckDiscoveryMessage snapshotMsg) throws IgniteCheckedException { return null; } @@ -225,6 +227,40 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** + * Reserve update history for exchange. + * + * @return Reserved update counters per cache and partition. + */ + public Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() { + return Collections.emptyMap(); + } + + /** + * Release reserved update history. + */ + public void releaseHistoryForExchange() { + // No-op + } + + /** + * Reserve update history for preloading. + * @param cacheId Cache ID. + * @param partId Partition Id. + * @param cntr Update counter. + * @return True if successfully reserved. + */ + public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) { + return false; + } + + /** + * Release reserved update history. + */ + public void releaseHistoryForPreloading() { + // No-op + } + + /** * @param dbCfg Database configuration. * @return Page memory instance. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index ca71f51..e63379a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -51,6 +52,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; /** * Partition topology for node which does not have any local partitions. @@ -353,6 +355,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { + return localPartition(p, topVer, create); + } + + /** {@inheritDoc} */ @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { return localPartition(1, AffinityTopologyVersion.NONE, create); } @@ -550,9 +558,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap, - Map<Integer, T2<Long, Long>> cntrMap) { + @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionsExchangeFuture exchFut, + GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload) { + + GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null; + if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -910,7 +920,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void setOwners(int p, Set<UUID> owners, boolean updateSeq) { + @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { + Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); + lock.writeLock().lock(); try { @@ -918,8 +930,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (!e.getValue().containsKey(p)) continue; - if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) - e.getValue().put(p, MOVING); + if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) { + if (haveHistory) + e.getValue().put(p, MOVING); + else { + e.getValue().put(p, RENTING); + + result.add(e.getKey()); + } + } else if (owners.contains(e.getKey())) e.getValue().put(p, OWNING); } @@ -932,6 +951,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { finally { lock.writeLock().unlock(); } + + return result; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 5b5cc3b..f1f4376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -91,7 +91,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { // Record this entry with partition. int p = cctx.affinity().partition(key); - locPart = ctx.topology().localPartition(p, topVer, true); + locPart = ctx.topology().localPartition(p, topVer, true, true); assert locPart != null : p; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4418f53..aeb40f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -138,13 +138,16 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * reservation is released. */ private volatile boolean shouldBeRenting; + /** Set if partition must be re-created and preloaded after eviction. */ + private boolean reload; + /** * @param cctx Context. * @param id Partition ID. * @param entryFactory Entry factory. */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - GridDhtLocalPartition(GridCacheContext cctx, int id, GridCacheMapEntryFactory entryFactory) { + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheContext cctx, + int id, GridCacheMapEntryFactory entryFactory) { assert cctx != null; this.id = id; @@ -373,12 +376,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, map.removeEntry(entry); // Attempt to evict. - try { - tryEvict(); - } - catch (NodeStoppingException ignore) { - // No-op. - } + tryEvictAsync(false); } /** @@ -514,12 +512,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, if ((reservations & 0xFFFF) == 0 && shouldBeRenting) rent(true); - try { - tryEvict(); - } - catch (NodeStoppingException ignore) { - // No-op. - } + tryEvictAsync(false); break; } @@ -530,7 +523,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @param stateToRestore State to restore. */ public void restoreState(GridDhtPartitionState stateToRestore) { - state.set(((long)stateToRestore.ordinal()) << 32); + state.set(((long)stateToRestore.ordinal()) << 32); } /** @@ -632,10 +625,24 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** + * @return {@code True} if partition should be re-created after it is cleared. + */ + public boolean reload() { + return reload; + } + + /** + * @param value {@code reload} flag value. + */ + public void reload(boolean value) { + reload = value; + } + + /** * @param updateSeq Update sequence. * @return Future to signal that this node is no longer an owner or backup. */ - IgniteInternalFuture<?> rent(boolean updateSeq) { + public IgniteInternalFuture<?> rent(boolean updateSeq) { long reservations = state.get(); int ord = (int)(reservations >> 32); @@ -663,8 +670,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @param updateSeq Update sequence. */ void tryEvictAsync(boolean updateSeq) { - assert cctx.kernalContext().state().active(); - long reservations = state.get(); int ord = (int)(reservations >> 32); @@ -713,7 +718,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ private void clearEvicting() { - boolean free; + boolean free; while (true) { int cnt = evictGuard.get(); @@ -804,9 +809,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, */ private void destroyCacheDataStore() { try { - CacheDataStore store = dataStore(); - - cctx.offheap().destroyCacheDataStore(id, store); + cctx.offheap().destroyCacheDataStore(dataStore()); } catch (IgniteCheckedException e) { log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e); @@ -817,12 +820,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * */ void onUnlock() { - try { - tryEvict(); - } - catch (NodeStoppingException ignore) { - // No-op. - } + tryEvictAsync(false); } /** @@ -949,7 +947,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, try { CacheDataRow row = it0.next(); - GridDhtCacheEntry cached = (GridDhtCacheEntry)cctx.cache().entryEx(row.key()); + GridDhtCacheEntry cached = (GridDhtCacheEntry) getEntry(row.key()); + + if (cached == null || cached.obsolete()) + cached = (GridDhtCacheEntry) putEntryIfObsoleteOrAbsent( + cctx.affinity().affinityTopologyVersion(), row.key(), null, true, false); if (cached.clearInternal(clearVer, extras)) { if (rec) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index b57ca57..7b5dee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -1262,17 +1262,24 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> try { GridCacheEntryEx entry = cache0.entryEx(info.key(), topVer); - if (entry.initialValue(info.value(), - info.version(), - info.ttl(), - info.expireTime(), - true, topVer, - replicate ? DR_PRELOAD : DR_NONE, - false)) { - if (rec && !entry.isInternal()) - cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, - false, null, null, null, false); + cctx.shared().database().checkpointReadLock(); + + try { + if (entry.initialValue(info.value(), + info.version(), + info.ttl(), + info.expireTime(), + true, topVer, + replicate ? DR_PRELOAD : DR_NONE, + false)) { + if (rec && !entry.isInternal()) + cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null, + false, null, null, null, false); + } + } + finally { + cctx.shared().database().checkpointReadUnlock(); } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index ac3e2c8..37c3af9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -128,6 +128,18 @@ public interface GridDhtPartitionTopology { throws GridDhtInvalidPartitionException; /** + * @param topVer Topology version at the time of creation. + * @param p Partition ID. + * @param create If {@code true}, then partition will be created if it's not there. + * @return Local partition. + * @throws GridDhtInvalidPartitionException If partition is evicted or absent and + * does not belong to this node. + */ + @Nullable public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, + boolean showRenting) + throws GridDhtInvalidPartitionException; + + /** * @param parts Partitions to release (should be reserved before). */ public void releasePartitions(int... parts); @@ -213,14 +225,15 @@ public interface GridDhtPartitionTopology { public void onRemoved(GridDhtCacheEntry e); /** - * @param exchId Exchange ID. + * @param exchFut Exchange future. * @param partMap Update partition map. * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ - public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionsExchangeFuture exchFut, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap); + @Nullable Map<Integer, T2<Long, Long>> cntrMap, + Set<Integer> partsToReload); /** * @param exchId Exchange ID. @@ -301,6 +314,7 @@ public interface GridDhtPartitionTopology { * @param p Partition ID. * @param updateSeq If should increment sequence when updated. * @param owners Set of new owners. + * @return Set of node IDs that should reload partitions. */ - public void setOwners(int p, Set<UUID> owners, boolean updateSeq); + public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 66e5be3..553aa2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -32,12 +32,10 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -59,7 +57,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; @@ -70,8 +67,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Partition topology. */ -@GridToStringExclude -class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +@GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -189,93 +185,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); } - /** - * Waits for renting partitions. - * - * @return {@code True} if mapping was changed. - * @throws IgniteCheckedException If failed. - */ - private boolean waitForRent() throws IgniteCheckedException { - final long longOpDumpTimeout = - IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, 60_000); - - int dumpCnt = 0; - - GridDhtLocalPartition part; - - for (int i = 0; i < locParts.length(); i++) { - part = locParts.get(i); - - if (part == null) - continue; - - GridDhtPartitionState state = part.state(); - - if (state == RENTING || state == EVICTED) { - if (log.isDebugEnabled()) - log.debug("Waiting for renting partition: " + part); - - part.tryEvictAsync(false); - - // Wait for partition to empty out. - if (longOpDumpTimeout > 0) { - while (true) { - try { - part.rent(true).get(longOpDumpTimeout); - - break; - } - catch (IgniteFutureTimeoutCheckedException e) { - if (dumpCnt++ < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { - U.warn(log, "Failed to wait for partition eviction [" + - "topVer=" + topVer + - ", cache=" + cctx.name() + - ", part=" + part.id() + - ", partState=" + part.state() + - ", size=" + part.size() + - ", reservations=" + part.reservations() + - ", grpReservations=" + part.groupReserved() + - ", node=" + cctx.localNodeId() + "]"); - - if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) - U.dumpThreads(log); - } - } - } - } - else - part.rent(true).get(); - - if (log.isDebugEnabled()) - log.debug("Finished waiting for renting partition: " + part); - } - } - - // Remove evicted partition. - lock.writeLock().lock(); - - try { - boolean changed = false; - - for (int i = 0; i < locParts.length(); i++) { - part = locParts.get(i); - - if (part == null) - continue; - - if (part.state() == EVICTED) { - locParts.set(i, null); - changed = true; - } - } - - return changed; - } - finally { - lock.writeLock().unlock(); - } - } - /** {@inheritDoc} */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) @Override public void readLock() { @@ -422,7 +331,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // If preloader is disabled, then we simply clear out // the partitions this node is not responsible for. for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false); boolean belongs = localNode(p, aff); @@ -498,9 +407,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { && !cctx.kernalContext().clientNode() ); - // Wait for rent outside of checkpoint lock. - waitForRent(); - ClusterNode loc = cctx.localNode(); cctx.shared().database().checkpointReadLock(); @@ -580,16 +486,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { finally { cctx.shared().database().checkpointReadUnlock(); } - - // Wait for evictions. - waitForRent(); } /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { treatAllPartAsLoc = false; - boolean changed = waitForRent(); + boolean changed = false; int num = cctx.affinity().partitions(); @@ -616,7 +519,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); + GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); if (cctx.affinity().partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, @@ -697,7 +600,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) throws GridDhtInvalidPartitionException { - return localPartition(p, topVer, create, true); + return localPartition0(p, topVer, create, false, true); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { + return localPartition0(p, topVer, create, showRenting, true); } /** @@ -734,9 +643,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Local partition. */ @SuppressWarnings("TooBroadScope") - private GridDhtLocalPartition localPartition(int p, + private GridDhtLocalPartition localPartition0(int p, AffinityTopologyVersion topVer, boolean create, + boolean showRenting, boolean updateSeq) { GridDhtLocalPartition loc; @@ -744,7 +654,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state = loc != null ? loc.state() : null; - if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction())) + if (loc != null && state != EVICTED && (state != RENTING || showRenting)) return loc; if (!create) @@ -769,8 +679,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); } - else if (loc != null && state == RENTING && cctx.allowFastEviction()) - throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted."); + else if (loc != null && state == RENTING && !showRenting) + throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " + + "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]"); if (loc == null) { if (!treatAllPartAsLoc && !belongs) @@ -1062,10 +973,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap2 update( - @Nullable GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut, GridDhtPartitionFullMap partMap, - @Nullable Map<Integer, T2<Long, Long>> cntrMap + @Nullable Map<Integer, T2<Long, Long>> cntrMap, + Set<Integer> partsToReload ) { + GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null; + if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -1117,8 +1031,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return null; } - long updateSeq = this.updateSeq.incrementAndGet(); - if (exchId != null) lastExchangeId = exchId; @@ -1186,11 +1098,32 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int p = e.getKey(); GridDhtPartitionState state = e.getValue(); - if (state == MOVING) { + if (state == OWNING) { GridDhtLocalPartition locPart = locParts.get(p); assert locPart != null; + if (cntrMap != null) { + T2<Long, Long> cntr = cntrMap.get(p); + + if (cntr != null && cntr.get2() > locPart.updateCounter()) + locPart.updateCounter(cntr.get2()); + } + + if (locPart.state() == MOVING) { + boolean success = locPart.own(); + + assert success : locPart; + + changed |= success; + } + } + else if (state == MOVING) { + GridDhtLocalPartition locPart = locParts.get(p); + + if (locPart == null || locPart.state() == EVICTED) + locPart = createPartition(p); + if (locPart.state() == OWNING) { locPart.moving(); @@ -1204,9 +1137,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { locPart.updateCounter(cntr.get2()); } } + else if (state == RENTING && partsToReload.contains(p)) { + GridDhtLocalPartition locPart = locParts.get(p); + + if (locPart == null || locPart.state() == EVICTED) { + createPartition(p); + + changed = true; + } + else if (locPart.state() == OWNING || locPart.state() == MOVING) { + locPart.reload(true); + + locPart.rent(false); + + changed = true; + } + else { + locPart.reload(true); + } + } } } + long updateSeq = this.updateSeq.incrementAndGet(); + if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); @@ -1536,24 +1490,42 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void setOwners(int p, Set<UUID> owners, boolean updateSeq) { + @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { + Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); + lock.writeLock().lock(); try { - GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) - locPart.moving(); + if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) { + if (haveHistory) + locPart.moving(); + else { + locPart.rent(false); + + locPart.reload(true); + + result.add(cctx.localNodeId()); + } + + } } for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) { if (!e.getValue().containsKey(p)) continue; - if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) - e.getValue().put(p, MOVING); + if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) { + if (haveHistory) + e.getValue().put(p, MOVING); + else { + e.getValue().put(p, RENTING); + + result.add(e.getKey()); + } + } } if (updateSeq) @@ -1562,6 +1534,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { finally { lock.writeLock().unlock(); } + + return result; } /** @@ -1629,6 +1603,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // If all affinity nodes are owners, then evict partition from local node. if (nodeIds.containsAll(F.nodeIds(affNodes))) { + part.reload(false); + part.rent(false); updateSeq = updateLocal(part.id(), part.state(), updateSeq); @@ -1654,6 +1630,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode n = sorted.get(i); if (locId.equals(n.id())) { + part.reload(false); + part.rent(false); updateSeq = updateLocal(part.id(), part.state(), updateSeq); @@ -1833,6 +1811,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); + if (part.reload()) + part = createPartition(part.id()); + updateLocal(part.id(), part.state(), seq); consistencyCheck(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index debf8b6..5169f27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -147,7 +147,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { storeEnabled, onePhaseCommit, txSize, - subjId, + subjId, taskNameHash ); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index c9dcaaf..377c274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -50,7 +50,11 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { @GridDirectCollection(int.class) private Collection<Integer> parts; - /** Partition. */ + /** Partitions that must be restored from history. */ + @GridDirectCollection(int.class) + private Collection<Integer> historicalParts; + + /** Partition counters. */ @GridDirectMap(keyType = int.class, valueType = long.class) private Map<Integer, Long> partsCntrs; @@ -85,7 +89,8 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { * @param cp Message to copy from. * @param parts Partitions. */ - GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) { + GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, + Map<Integer, Long> partsCntrs) { cacheId = cp.cacheId; updateSeq = cp.updateSeq; topic = cp.topic; @@ -96,6 +101,9 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { // Create a copy of passed in collection since it can be modified when this message is being sent. this.parts = new HashSet<>(parts); this.partsCntrs = partsCntrs; + + if (cp.historicalParts != null) + historicalParts = new HashSet<>(cp.historicalParts); } /** @@ -108,13 +116,19 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** * @param p Partition. */ - void addPartition(int p) { + void addPartition(int p, boolean historical) { if (parts == null) parts = new HashSet<>(); parts.add(p); - } + if (historical) { + if (historicalParts == null) + historicalParts = new HashSet<>(); + + historicalParts.add(p); + } + } /** * @return Partition. @@ -124,6 +138,17 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { } /** + * @param p Partition to check. + * @return {@code True} if historical. + */ + boolean isHistorical(int p) { + if (historicalParts == null) + return false; + + return historicalParts.contains(p); + } + + /** * @param updateSeq Update sequence. */ void updateSequence(long updateSeq) { @@ -232,42 +257,48 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { switch (writer.state()) { case 3: - if (!writer.writeCollection("parts", parts, MessageCollectionItemType.INT)) + if (!writer.writeCollection("historicalParts", historicalParts, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 4: - if (!writer.writeMap("partsCntrs", partsCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + if (!writer.writeCollection("parts", parts, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 5: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeMap("partsCntrs", partsCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) return false; writer.incrementState(); case 6: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("topicBytes", topicBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 8: - if (!writer.writeLong("updateSeq", updateSeq)) + if (!writer.writeByteArray("topicBytes", topicBytes)) return false; writer.incrementState(); case 9: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + + case 10: if (!writer.writeInt("workerId", workerId)) return false; @@ -290,7 +321,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { switch (reader.state()) { case 3: - parts = reader.readCollection("parts", MessageCollectionItemType.INT); + historicalParts = reader.readCollection("historicalParts", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -298,7 +329,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 4: - partsCntrs = reader.readMap("partsCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + parts = reader.readCollection("parts", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -306,7 +337,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 5: - timeout = reader.readLong("timeout"); + partsCntrs = reader.readMap("partsCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); if (!reader.isLastRead()) return false; @@ -314,7 +345,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 6: - topVer = reader.readMessage("topVer"); + timeout = reader.readLong("timeout"); if (!reader.isLastRead()) return false; @@ -322,7 +353,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 7: - topicBytes = reader.readByteArray("topicBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -330,7 +361,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 8: - updateSeq = reader.readLong("updateSeq"); + topicBytes = reader.readByteArray("topicBytes"); if (!reader.isLastRead()) return false; @@ -338,6 +369,14 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { reader.incrementState(); case 9: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: workerId = reader.readInt("workerId"); if (!reader.isLastRead()) @@ -357,7 +396,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index b80ad04..c04a7b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -278,10 +278,16 @@ class GridDhtPartitionSupplier { IgniteRebalanceIterator iter; if (sctx == null || sctx.entryIt == null) { - iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part)); + iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), + d.isHistorical(part) ? d.partitionCounter(part) : null); + + if (!iter.historical()) { + assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part); - if (!iter.historical()) s.clean(part); + } + else + assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part); } else iter = (IgniteRebalanceIterator)sctx.entryIt; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java index f4c624d..5cfd6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -235,7 +235,6 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { assert info != null; assert (info.key() != null || info.keyBytes() != null); - assert info.value() != null; // Need to call this method to initialize info properly. marshalInfo(info, ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4c179e6..6cba4f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -218,11 +218,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private boolean exchangeOnChangeGlobalState; /** */ - private final ConcurrentMap<UUID, GridDhtPartitionsAbstractMessage> msgs = new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>(); + + /** */ + private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); /** Forced Rebalance future. */ private GridFutureAdapter<Boolean> forcedRebFut; + /** */ + private volatile Map<Integer, Map<Integer, Long>> partHistReserved; + + /** */ + private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -368,6 +377,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param cacheId Cache ID. + * @param partId Partition ID. + * @return ID of history supplier node or null if it doesn't exist. + */ + @Nullable public UUID partitionHistorySupplier(int cacheId, int partId) { + return partHistSuppliers.getSupplier(cacheId, partId); + } + + /** * @param cacheId Cache ID to check. * @param topVer Topology version. * @return {@code True} if cache was added during this exchange. @@ -509,7 +527,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)discoEvt).customMessage(); - if (msg instanceof DynamicCacheChangeBatch){ + if (msg instanceof DynamicCacheChangeBatch) { assert !F.isEmpty(reqs); exchange = onCacheChangeRequest(crdNode); @@ -678,7 +696,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); if (updateTop && clientTop != null) - cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false)); + cacheCtx.topology().update(this, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet()); } top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); @@ -798,9 +816,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(exchId, + cacheCtx.topology().update(this, top.partitionMap(true), - top.updateCounters(false)); + top.updateCounters(false), + Collections.<Integer>emptySet()); break; } @@ -845,11 +864,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cacheCtx.preloader().onTopologyChanged(this); } + cctx.database().releaseHistoryForPreloading(); + + // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange. + partHistReserved = cctx.database().reserveHistoryForExchange(); + waitPartitionRelease(); boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; - //todo check for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId())) continue; @@ -1107,6 +1130,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage( node, exchangeId(), clientOnlyExchange, true); + Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; + + if (partHistReserved0 != null) + m.partitionHistoryCounters(partHistReserved0); + if (exchangeOnChangeGlobalState && changeGlobalStateE != null) m.setException(changeGlobalStateE); @@ -1133,6 +1161,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT nodes, exchangeId(), last != null ? last : cctx.versions().last(), + partHistSuppliers, + partsToReload, compress); if (exchangeOnChangeGlobalState && !F.isEmpty(changeGlobalStateExceptions)) @@ -1252,6 +1282,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (exchangeOnChangeGlobalState && err == null) cctx.kernalContext().state().onExchangeDone(); + Map<T2<Integer, Integer>, Long> localReserved = partHistSuppliers.getReservations(cctx.localNodeId()); + + if (localReserved != null) { + for (Map.Entry<T2<Integer, Integer>, Long> e : localReserved.entrySet()) { + boolean success = cctx.database().reserveHistoryForPreloading( + e.getKey().get1(), e.getKey().get2(), e.getValue()); + + if (!success) { + // TODO: how to handle? + err = new IgniteCheckedException("Could not reserve history"); + } + } + } + + cctx.database().releaseHistoryForExchange(); + if (super.onDone(res, err) && realExchange) { if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + @@ -1523,8 +1569,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void assignPartitionStates(GridDhtPartitionTopology top) { Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>(); + Map<Integer, Long> minCntrs = new HashMap<>(); - for (Map.Entry<UUID, GridDhtPartitionsAbstractMessage> e : msgs.entrySet()) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { assert e.getValue().partitionUpdateCounters(top.cacheId()) != null; for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) { @@ -1534,12 +1581,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionState state = top.partitionState(uuid, p); - if (state != GridDhtPartitionState.OWNING) + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING) continue; - Long cntr = e0.getValue().get1(); + Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2(); if (cntr == null) + cntr = 0L; + + Long minCntr = minCntrs.get(p); + + if (minCntr == null || minCntr > cntr) + minCntrs.put(p, cntr); + + if (state != GridDhtPartitionState.OWNING) continue; CounterWithNodes maxCntr = maxCntrs.get(p); @@ -1555,29 +1610,86 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (GridDhtLocalPartition part : top.currentLocalPartitions()) { GridDhtPartitionState state = top.partitionState(cctx.localNodeId(), part.id()); + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING) + continue; + + long cntr = state == GridDhtPartitionState.MOVING ? part.initialUpdateCounter() : part.updateCounter(); + + Long minCntr = minCntrs.get(part.id()); + + if (minCntr == null || minCntr > cntr) + minCntrs.put(part.id(), cntr); + if (state != GridDhtPartitionState.OWNING) continue; CounterWithNodes maxCntr = maxCntrs.get(part.id()); - if (maxCntr == null || part.initialUpdateCounter() > maxCntr.cnt) - maxCntrs.put(part.id(), new CounterWithNodes(part.updateCounter(), cctx.localNodeId())); - else if (part.initialUpdateCounter() == maxCntr.cnt) + if (maxCntr == null || cntr > maxCntr.cnt) + maxCntrs.put(part.id(), new CounterWithNodes(cntr, cctx.localNodeId())); + else if (cntr == maxCntr.cnt) maxCntr.nodes.add(cctx.localNodeId()); } int entryLeft = maxCntrs.size(); + Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; + + Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.cacheId()) : null; + + Set<Integer> haveHistory = new HashSet<>(); + + for (Map.Entry<Integer, Long> e : minCntrs.entrySet()) { + int p = e.getKey(); + long minCntr = e.getValue(); + + CounterWithNodes maxCntrObj = maxCntrs.get(p); + + long maxCntr = maxCntrObj != null ? maxCntrObj.cnt : 0; + + // If minimal counter is zero, do clean preloading. + if (minCntr == 0 || minCntr == maxCntr) + continue; + + if (localReserved != null) { + Long localCntr = localReserved.get(p); + + if (localCntr != null && localCntr <= minCntr && + maxCntrObj.nodes.contains(cctx.localNodeId())) { + partHistSuppliers.put(cctx.localNodeId(), top.cacheId(), p, minCntr); + + haveHistory.add(p); + + continue; + } + } + + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) { + Long histCntr = e0.getValue().partitionHistoryCounters(top.cacheId()).get(p); + + if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) { + partHistSuppliers.put(e0.getKey(), top.cacheId(), p, minCntr); + + haveHistory.add(p); + + break; + } + } + } + for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) { int p = e.getKey(); long maxCntr = e.getValue().cnt; - entryLeft --; + entryLeft--; if (entryLeft != 0 && maxCntr == 0) continue; - top.setOwners(p, e.getValue().nodes, entryLeft == 0); + Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, haveHistory.contains(p), entryLeft == 0); + + for (UUID nodeId : nodesToReload) + partsToReload.put(nodeId, top.cacheId(), p); } } @@ -1618,6 +1730,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { assert crd.isLocal(); + assert partHistSuppliers.isEmpty(); + if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) @@ -1818,6 +1932,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) { cctx.versions().onExchange(msg.lastVersion().order()); + assert partHistSuppliers.isEmpty(); + + partHistSuppliers.putAll(msg.partitionHistorySuppliers()); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); @@ -1826,12 +1944,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null) - cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); + cacheCtx.topology().update(this, entry.getValue(), cntrMap, + msg.partsToReload(cctx.localNodeId(), cacheId)); else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); + cctx.exchange().clientTopology(cacheId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); } } } @@ -2014,7 +2133,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (crd0.isLocal()) { - if (exchangeOnChangeGlobalState && changeGlobalStateE !=null) + if (exchangeOnChangeGlobalState && changeGlobalStateE != null) changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE); if (allReceived) {
