http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index c4c57a7..977e9ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -43,7 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Force keys response. Contains absent keys. */ -public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtForceKeysResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -168,7 +168,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa if (infos != null) { for (GridCacheEntryInfo info : infos) - info.marshal(cctx); + info.marshal(cctx.cacheObjectContext()); } if (err != null && errBytes == null) @@ -186,7 +186,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa if (infos != null) { for (GridCacheEntryInfo info : infos) - info.unmarshal(cctx, ldr); + info.unmarshal(cctx.cacheObjectContext(), ldr); } if (errBytes != null && err == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index ef6a3b9..4a693bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -39,7 +39,7 @@ import org.jetbrains.annotations.NotNull; /** * Partition demand request. */ -public class GridDhtPartitionDemandMessage extends GridCacheMessage { +public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage { /** */ private static final long serialVersionUID = 0L; @@ -77,10 +77,10 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { /** * @param updateSeq Update sequence for this node. * @param topVer Topology version. - * @param cacheId Cache ID. + * @param grpId Cache group ID. */ - GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) { - this.cacheId = cacheId; + GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) { + this.grpId = grpId; this.updateSeq = updateSeq; this.topVer = topVer; } @@ -91,7 +91,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { */ GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) { - cacheId = cp.cacheId; + grpId = cp.grpId; updateSeq = cp.updateSeq; topic = cp.topic; timeout = cp.timeout; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 3c04617..cda24e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -38,13 +37,17 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @SuppressWarnings("NonConstantFieldWithUpperCaseName") public class GridDhtPartitionDemander { /** */ - private final GridCacheContext<?, ?> cctx; + private final GridCacheSharedContext<?, ?> ctx; + + /** */ + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -104,30 +110,20 @@ public class GridDhtPartitionDemander { private final Map<Integer, Object> rebalanceTopics; /** - * Started event sent. - * Make sense for replicated cache only. + * @param grp Ccahe group. */ - private final AtomicBoolean startedEvtSent = new AtomicBoolean(); + public GridDhtPartitionDemander(CacheGroupContext grp) { + assert grp != null; - /** - * Stopped event sent. - * Make sense for replicated cache only. - */ - private final AtomicBoolean stoppedEvtSent = new AtomicBoolean(); + this.grp = grp; - /** - * @param cctx Cctx. - */ - public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) { - assert cctx != null; - - this.cctx = cctx; + ctx = grp.shared(); - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); + boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode(); - rebalanceFut = new RebalanceFuture();//Dummy. + rebalanceFut = new RebalanceFuture(); //Dummy. if (!enabled) { // Calling onDone() immediately since preloading is disabled. @@ -137,7 +133,7 @@ public class GridDhtPartitionDemander { Map<Integer, Object> tops = new HashMap<>(); - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) + for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++) tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); rebalanceTopics = tops; @@ -196,7 +192,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -208,7 +204,7 @@ public class GridDhtPartitionDemander { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut); + IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut); fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> future) { @@ -237,7 +233,7 @@ public class GridDhtPartitionDemander { */ private boolean topologyChanged(RebalanceFuture fut) { return - !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. + !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed. fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. } @@ -268,12 +264,12 @@ public class GridDhtPartitionDemander { assert force == (forcedRebFut != null); - long delay = cctx.config().getRebalanceDelay(); + long delay = grp.config().getRebalanceDelay(); if (delay == 0 || force) { final RebalanceFuture oldFut = rebalanceFut; - final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt); + final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt); if (!oldFut.isInitial()) oldFut.cancel(); @@ -302,16 +298,18 @@ public class GridDhtPartitionDemander { fut.sendRebalanceStartedEvent(); - final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + for (GridCacheContext cctx : grp.caches()) { + if (cctx.config().isStatisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); - if (statsEnabled) { - cctx.cache().metrics0().clearRebalanceCounters(); + metrics.clearRebalanceCounters(); - rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - cctx.cache().metrics0().clearRebalanceCounters(); - } - }); + rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + metrics.clearRebalanceCounters(); + } + }); + } } if (assigns.cancelled()) { // Pending exchange. @@ -331,7 +329,7 @@ public class GridDhtPartitionDemander { fut.onDone(true); - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); fut.sendRebalanceFinishedEvent(); @@ -362,7 +360,7 @@ public class GridDhtPartitionDemander { GridTimeoutObject obj = lastTimeoutObj.get(); if (obj != null) - cctx.time().removeTimeoutObject(obj); + ctx.time().removeTimeoutObject(obj); final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; @@ -372,7 +370,7 @@ public class GridDhtPartitionDemander { @Override public void onTimeout() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { - cctx.shared().exchange().forceRebalance(exchFut); + ctx.exchange().forceRebalance(exchFut); } }); } @@ -380,7 +378,7 @@ public class GridDhtPartitionDemander { lastTimeoutObj.set(obj); - cctx.time().addTimeoutObject(obj); + ctx.time().addTimeoutObject(obj); } return null; @@ -389,7 +387,6 @@ public class GridDhtPartitionDemander { /** * @param fut Rebalance future. * @param assigns Assignments. - * @throws IgniteCheckedException If failed. */ private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assigns) { assert fut != null; @@ -411,17 +408,19 @@ public class GridDhtPartitionDemander { Collection<Integer> parts= e.getValue().partitions(); - assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]"; + assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]"; fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); } } + final CacheConfiguration cfg = grp.config(); + + int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize(); + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { final ClusterNode node = e.getKey(); - final CacheConfiguration cfg = cctx.config(); - final Collection<Integer> parts = fut.remaining.get(node.id()).get2(); GridDhtPartitionDemandMessage d = e.getValue(); @@ -430,8 +429,6 @@ public class GridDhtPartitionDemander { ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); for (int cnt = 0; cnt < lsnrCnt; cnt++) @@ -451,16 +448,15 @@ public class GridDhtPartitionDemander { initD.topic(rebalanceTopics.get(cnt)); initD.updateSequence(fut.updateSeq); - initD.timeout(cctx.config().getRebalanceTimeout()); + initD.timeout(cfg.getRebalanceTimeout()); final int finalCnt = cnt; - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + ctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { try { if (!fut.isDone()) { - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout()); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout()); // Cleanup required in case partitions demanded in parallel with cancellation. synchronized (fut) { @@ -507,11 +503,11 @@ public class GridDhtPartitionDemander { for (Integer part : parts) { try { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { if (partCntrs == null) partCntrs = new HashMap<>(parts.size(), 1.0f); - GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false); + GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false); partCntrs.put(part, p.initialUpdateCounter()); } @@ -587,7 +583,7 @@ public class GridDhtPartitionDemander { final RebalanceFuture fut = rebalanceFut; - ClusterNode node = cctx.node(id); + ClusterNode node = ctx.node(id); if (node == null) return; @@ -611,23 +607,42 @@ public class GridDhtPartitionDemander { return; } - final GridDhtPartitionTopology top = cctx.dht().topology(); + final GridDhtPartitionTopology top = grp.topology(); + + if (grp.sharedGroup()) { + for (GridCacheContext cctx : grp.caches()) { + if (cctx.config().isStatisticsEnabled()) { + long keysCnt = supply.keysForCache(cctx.cacheId()); - final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + if (keysCnt != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt); - if (statsEnabled) { - if (supply.estimatedKeysCount() != -1) - cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); + // Can not be calculated per cache. + cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + } + } + } + else { + GridCacheContext cctx = grp.singleCacheContext(); - cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + if (cctx.config().isStatisticsEnabled()) { + if (supply.estimatedKeysCount() != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); + + cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + } } try { + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + + GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); + // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { int p = e.getKey(); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (aff.get(p).contains(ctx.localNode())) { GridDhtLocalPartition part = top.localPartition(p, topVer, true); assert part != null; @@ -638,7 +653,7 @@ public class GridDhtPartitionDemander { boolean reserved = part.reserve(); assert reserved : "Failed to reserve partition [igniteInstanceName=" + - cctx.igniteInstanceName() + ", cacheName=" + cctx.name() + ", part=" + part + ']'; + ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'; part.lock(); @@ -662,7 +677,10 @@ public class GridDhtPartitionDemander { break; } - if (statsEnabled) + if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) + cctx = ctx.cacheContext(entry.cacheId()); + + if(cctx != null && cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onRebalanceKeyReceived(); } @@ -700,7 +718,7 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) { - if (cctx.affinity().partitionLocalNode(miss, topVer)) + if (aff.get(miss).contains(ctx.localNode())) fut.partitionMissed(id, miss); } @@ -708,16 +726,18 @@ public class GridDhtPartitionDemander { fut.partitionDone(id, miss); GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - supply.updateSequence(), supply.topologyVersion(), cctx.cacheId()); + supply.updateSequence(), + supply.topologyVersion(), + grp.groupId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); d.topic(rebalanceTopics.get(idx)); if (!topologyChanged(fut) && !fut.isDone()) { // Send demand message. - cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException e) { @@ -746,11 +766,15 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry, AffinityTopologyVersion topVer ) throws IgniteCheckedException { + ctx.database().checkpointReadLock(); + try { GridCacheEntryEx cached = null; try { - cached = cctx.dht().entryEx(entry.key()); + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext(); + + cached = cctx.dhtCache().entryEx(entry.key()); if (log.isDebugEnabled()) log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); @@ -808,7 +832,10 @@ public class GridDhtPartitionDemander { } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + } + finally { + ctx.database().checkpointReadUnlock(); } return true; @@ -824,16 +851,10 @@ public class GridDhtPartitionDemander { */ public static class RebalanceFuture extends GridFutureAdapter<Boolean> { /** */ - private static final long serialVersionUID = 1L; - - /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */ - private final AtomicBoolean startedEvtSent; - - /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */ - private final AtomicBoolean stoppedEvtSent; + private final GridCacheSharedContext<?, ?> ctx; /** */ - private final GridCacheContext<?, ?> cctx; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -855,42 +876,38 @@ public class GridDhtPartitionDemander { private final long updateSeq; /** + * @param grp Cache group. * @param assigns Assigns. - * @param cctx Context. * @param log Logger. - * @param startedEvtSent Start event sent flag. - * @param stoppedEvtSent Stop event sent flag. * @param updateSeq Update sequence. */ - RebalanceFuture(GridDhtPreloaderAssignments assigns, - GridCacheContext<?, ?> cctx, + RebalanceFuture( + CacheGroupContext grp, + GridDhtPreloaderAssignments assigns, IgniteLogger log, - AtomicBoolean startedEvtSent, - AtomicBoolean stoppedEvtSent, long updateSeq) { assert assigns != null; exchFut = assigns.exchangeFuture(); topVer = assigns.topologyVersion(); - this.cctx = cctx; + this.grp = grp; this.log = log; - this.startedEvtSent = startedEvtSent; - this.stoppedEvtSent = stoppedEvtSent; this.updateSeq = updateSeq; + + ctx= grp.shared(); } /** * Dummy future. Will be done by real one. */ - public RebalanceFuture() { - exchFut = null; - topVer = null; - cctx = null; - log = null; - startedEvtSent = null; - stoppedEvtSent = null; - updateSeq = -1; + RebalanceFuture() { + this.exchFut = null; + this.topVer = null; + this.ctx = null; + this.grp = null; + this.log = null; + this.updateSeq = -1; } /** @@ -927,7 +944,7 @@ public class GridDhtPartitionDemander { U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']'); - if (!cctx.kernalContext().isStopping()) { + if (!ctx.kernalContext().isStopping()) { for (UUID nodeId : remaining.keySet()) cleanupRemoteContexts(nodeId); } @@ -948,7 +965,7 @@ public class GridDhtPartitionDemander { if (isDone()) return; - U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() + + U.log(log, ("Cancelled rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); @@ -983,22 +1000,24 @@ public class GridDhtPartitionDemander { * @param nodeId Node id. */ private void cleanupRemoteContexts(UUID nodeId) { - ClusterNode node = cctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); if (node == null) return; GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); + -1/* remove supply context signal */, + this.topologyVersion(), + grp.groupId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(grp.config().getRebalanceTimeout()); try { - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, grp.ioPolicy(), grp.config().getRebalanceTimeout()); } } catch (IgniteCheckedException ignored) { @@ -1016,20 +1035,19 @@ public class GridDhtPartitionDemander { if (isDone()) return; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent()); T2<Long, Collection<Integer>> t = remaining.get(nodeId); - assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + "]"; Collection<Integer> parts = t.get2(); boolean rmvd = parts.remove(p); - assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId + + assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId + ", part=" + p + ", left=" + parts + "]"; if (parts.isEmpty()) { @@ -1049,18 +1067,18 @@ public class GridDhtPartitionDemander { * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) { assert discoEvt != null; - cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); } /** * @param type Type. * @param discoEvt Discovery event. */ - private void preloadEvent(int type, DiscoveryEvent discoEvt) { - preloadEvent(-1, type, discoEvt); + private void rebalanceEvent(int type, DiscoveryEvent discoEvt) { + rebalanceEvent(-1, type, discoEvt); } /** @@ -1080,7 +1098,7 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); Collection<Integer> m = new HashSet<>(); @@ -1094,13 +1112,13 @@ public class GridDhtPartitionDemander { onDone(false); //Finished but has missed partitions, will force dummy exchange - cctx.shared().exchange().forceDummyExchange(true, exchFut); + ctx.exchange().forceDummyExchange(true, exchFut); return; } - if (!cancelled && !cctx.preloader().syncFuture().isDone()) - ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + if (!cancelled && !grp.preloader().syncFuture().isDone()) + ((GridFutureAdapter)grp.preloader().syncFuture()).onDone(); onDone(!cancelled); } @@ -1110,24 +1128,16 @@ public class GridDhtPartitionDemander { * */ private void sendRebalanceStartedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) && - (!cctx.isReplicated() || !startedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); - - startedEvtSent.set(true); - } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); } /** * */ private void sendRebalanceFinishedEvent() { - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && - (!cctx.isReplicated() || !stoppedEvtSent.get())) { - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - - stoppedEvtSent.set(true); - } + if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED)) + rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 0ff03f7..467b906 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -26,7 +27,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; @@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh */ class GridDhtPartitionSupplier { /** */ - private final GridCacheContext<?, ?> cctx; + private final CacheGroupContext grp; /** */ private final IgniteLogger log; @@ -65,18 +66,18 @@ class GridDhtPartitionSupplier { private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>(); /** - * @param cctx Cache context. + * @param grp Cache group. */ - GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) { - assert cctx != null; + GridDhtPartitionSupplier(CacheGroupContext grp) { + assert grp != null; - this.cctx = cctx; + this.grp = grp; - log = cctx.logger(getClass()); + log = grp.shared().logger(getClass()); - top = cctx.dht().topology(); + top = grp.topology(); - depEnabled = cctx.gridDeploy().enabled(); + depEnabled = grp.shared().gridDeploy().enabled(); } /** @@ -171,7 +172,7 @@ class GridDhtPartitionSupplier { assert d != null; assert id != null; - AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion cutTop = grp.affinity().lastVersion(); AffinityTopologyVersion demTop = d.topologyVersion(); T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop); @@ -197,9 +198,12 @@ class GridDhtPartitionSupplier { ", from=" + id + ", idx=" + idx + "]"); GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage( - d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + d.updateSequence(), + grp.groupId(), + d.topologyVersion(), + grp.deploymentEnabled()); - ClusterNode node = cctx.discovery().node(id); + ClusterNode node = grp.shared().discovery().node(id); if (node == null) return; // Context will be cleaned at topology change. @@ -225,7 +229,7 @@ class GridDhtPartitionSupplier { boolean newReq = true; - long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount(); + long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount(); if (sctx != null) { phase = sctx.phase; @@ -234,7 +238,7 @@ class GridDhtPartitionSupplier { } else { if (log.isDebugEnabled()) - log.debug("Starting supplying rebalancing [cache=" + cctx.name() + + log.debug("Starting supplying rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -243,18 +247,19 @@ class GridDhtPartitionSupplier { Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); if (sctx == null) { - long keysCnt = 0; - for (Integer part : d.partitions()) { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); if (loc == null || loc.state() != OWNING) continue; - keysCnt += cctx.offheap().entriesCount(part); + if (grp.sharedGroup()) { + for (int cacheId : grp.cacheIds()) + s.addKeysForCache(cacheId, grp.offheap().cacheEntriesCount(cacheId, part)); + } + else + s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); } - - s.estimatedKeysCount(keysCnt); } while ((sctx != null && newReq) || partIt.hasNext()) { @@ -295,22 +300,24 @@ class GridDhtPartitionSupplier { IgniteRebalanceIterator iter; if (sctx == null || sctx.entryIt == null) { - iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), + iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.isHistorical(part) ? d.partitionCounter(part) : null); if (!iter.historical()) { - assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part); + assert !grp.shared().database().persistenceEnabled() || !d.isHistorical(part); s.clean(part); } else - assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part); + assert grp.shared().database().persistenceEnabled() && d.isHistorical(part); } else iter = (IgniteRebalanceIterator)sctx.entryIt; while (iter.hasNext()) { - if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) { + List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part); + + if (!nodes.contains(node)) { // Demander no longer needs this partition, // so we send '-1' partition and move on. s.missed(part); @@ -334,7 +341,7 @@ class GridDhtPartitionSupplier { break; } - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (s.messageSize() >= grp.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { saveSupplyContext(scId, phase, @@ -356,9 +363,9 @@ class GridDhtPartitionSupplier { return; s = new GridDhtPartitionSupplyMessage(d.updateSequence(), - cctx.cacheId(), + grp.groupId(), d.topologyVersion(), - cctx.deploymentEnabled()); + grp.deploymentEnabled()); } } @@ -370,9 +377,10 @@ class GridDhtPartitionSupplier { info.expireTime(row.expireTime()); info.version(row.version()); info.value(row.value()); + info.cacheId(row.cacheId()); if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); + s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext()); else { if (log.isDebugEnabled()) log.debug("Rebalance predicate evaluated to false (will not send " + @@ -421,7 +429,7 @@ class GridDhtPartitionSupplier { reply(node, d, s, scId); if (log.isDebugEnabled()) - log.debug("Finished supplying rebalancing [cache=" + cctx.name() + + log.debug("Finished supplying rebalancing [cache=" + grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + ", idx=" + idx + "]"); @@ -448,16 +456,15 @@ class GridDhtPartitionSupplier { GridDhtPartitionSupplyMessage s, T3<UUID, Integer, AffinityTopologyVersion> scId) throws IgniteCheckedException { - try { if (log.isDebugEnabled()) log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout()); // Throttle preloading. - if (cctx.config().getRebalanceThrottle() > 0) - U.sleep(cctx.config().getRebalanceThrottle()); + if (grp.config().getRebalanceThrottle() > 0) + U.sleep(grp.config().getRebalanceThrottle()); return true; } @@ -490,7 +497,7 @@ class GridDhtPartitionSupplier { AffinityTopologyVersion topVer, long updateSeq) { synchronized (scMap) { - if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { + if (grp.affinity().lastVersion().equals(topVer)) { assert scMap.get(t) == null; scMap.put(t, http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 1cb32e3..ef14a90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -28,13 +28,13 @@ import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -45,7 +45,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Partition supply message. */ -public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -79,17 +79,21 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** Estimated keys count. */ private long estimatedKeysCnt = -1; + /** Estimated keys count per cache in case the message is for shared group. */ + @GridDirectMap(keyType = int.class, valueType = long.class) + private Map<Integer, Long> keysPerCache; + /** * @param updateSeq Update sequence for this node. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param topVer Topology version. * @param addDepInfo Deployment info flag. */ GridDhtPartitionSupplyMessage(long updateSeq, - int cacheId, + int grpId, AffinityTopologyVersion topVer, boolean addDepInfo) { - this.cacheId = cacheId; + this.grpId = grpId; this.updateSeq = updateSeq; this.topVer = topVer; this.addDepInfo = addDepInfo; @@ -206,18 +210,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** * @param p Partition. * @param info Entry to add. - * @param ctx Cache context. + * @param ctx Cache shared context. + * @param cacheObjCtx Cache object context. * @throws IgniteCheckedException If failed. */ - void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException { assert info != null; assert info.key() != null : info; assert info.value() != null : info; // Need to call this method to initialize info properly. - marshalInfo(info, ctx); + marshalInfo(info, ctx, cacheObjCtx); - msgSize += info.marshalledSize(ctx); + msgSize += info.marshalledSize(cacheObjCtx); CacheEntryInfoCollection infoCol = infos().get(p); @@ -237,13 +242,13 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); for (CacheEntryInfoCollection col : infos().values()) { List<GridCacheEntryInfo> entries = col.infos(); for (int i = 0; i < entries.size(); i++) - entries.get(i).unmarshal(cacheCtx, ldr); + entries.get(i).unmarshal(grp.cacheObjectContext(), ldr); } } @@ -281,46 +286,53 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G writer.incrementState(); case 4: - if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt)) return false; writer.incrementState(); case 5: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 6: - if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + if (!writer.writeMap("keysPerCache", keysPerCache, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) return false; writer.incrementState(); case 7: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: - if (!writer.writeLong("updateSeq", updateSeq)) + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt)) + if (!writer.writeInt("msgSize", msgSize)) return false; writer.incrementState(); case 10: - if (!writer.writeInt("msgSize", msgSize)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); + + case 11: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + } return true; @@ -346,7 +358,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 4: - infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + estimatedKeysCnt = reader.readLong("estimatedKeysCnt"); if (!reader.isLastRead()) return false; @@ -354,7 +366,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 5: - last = reader.readCollection("last", MessageCollectionItemType.INT); + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -362,7 +374,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 6: - missed = reader.readCollection("missed", MessageCollectionItemType.INT); + keysPerCache = reader.readMap("keysPerCache", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); if (!reader.isLastRead()) return false; @@ -370,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 7: - topVer = reader.readMessage("topVer"); + last = reader.readCollection("last", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -378,7 +390,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 8: - updateSeq = reader.readLong("updateSeq"); + missed = reader.readCollection("missed", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -386,7 +398,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 9: - estimatedKeysCnt = reader.readLong("estimatedKeysCnt"); + msgSize = reader.readInt("msgSize"); if (!reader.isLastRead()) return false; @@ -394,12 +406,21 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G reader.incrementState(); case 10: - msgSize = reader.readInt("msgSize"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; reader.incrementState(); + + case 11: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class); @@ -412,7 +433,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** @@ -423,10 +444,43 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G } /** - * @param estimatedKeysCnt New estimated keys count. + * @param cnt Keys count to add. */ - public void estimatedKeysCount(long estimatedKeysCnt) { - this.estimatedKeysCnt = estimatedKeysCnt; + public void addEstimatedKeysCount(long cnt) { + this.estimatedKeysCnt += cnt; + } + + /** + * @return Estimated keys count for a given cache ID. + */ + public long keysForCache(int cacheId) { + if (this.keysPerCache == null) + return -1; + + Long cnt = this.keysPerCache.get(cacheId); + + return cnt != null ? cnt : 0; + } + + /** + * @param cacheId Cache ID. + * @param cnt Keys count. + */ + public void addKeysForCache(int cacheId, long cnt) { + assert cacheId != 0 && cnt >= 0; + + if (keysPerCache == null) + keysPerCache = new HashMap<>(); + + Long cnt0 = keysPerCache.get(cacheId); + + if (cnt0 == null) { + keysPerCache.put(cacheId, cnt); + + msgSize += 12; + } + else + keysPerCache.put(cacheId, cnt0 + cnt); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 74bbcb0..441952d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public int partition() { return GridIoMessage.STRIPE_DISABLED_PART; } @@ -87,10 +92,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Parition update counters. */ - public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId); + public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId); /** * @return Last used version among all nodes. @@ -114,6 +119,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -128,19 +138,19 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } switch (writer.state()) { - case 3: + case 2: if (!writer.writeMessage("exchId", exchId)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); - case 5: + case 4: if (!writer.writeMessage("lastVer", lastVer)) return false; @@ -162,7 +172,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage return false; switch (reader.state()) { - case 3: + case 2: exchId = reader.readMessage("exchId"); if (!reader.isLastRead()) @@ -170,7 +180,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage reader.incrementState(); - case 4: + case 3: flags = reader.readByte("flags"); if (!reader.isLastRead()) @@ -178,7 +188,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage reader.incrementState(); - case 5: + case 4: lastVer = reader.readMessage("lastVer"); if (!reader.isLastRead())
