http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java new file mode 100644 index 0000000..865bad8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -0,0 +1,1034 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.IgniteSpiException; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + +/** + * Thread pool for supplying partitions to demanding nodes. + */ +class GridDhtPartitionSupplier { + /** */ + private final GridCacheContext<?, ?> cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private GridDhtPartitionTopology top; + + /** */ + private final boolean depEnabled; + + /** Preload predicate. */ + private IgnitePredicate<GridCacheEntryInfo> preloadPred; + + /** Supply context map. T2: nodeId, idx, topVer. */ + private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>(); + + /** + * @param cctx Cache context. + */ + GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) { + assert cctx != null; + + this.cctx = cctx; + + log = cctx.logger(getClass()); + + top = cctx.dht().topology(); + + depEnabled = cctx.gridDeploy().enabled(); + } + + /** + * + */ + void start() { + startOldListeners(); + } + + /** + * + */ + void stop() { + synchronized (scMap) { + Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); + + while (it.hasNext()) { + T3<UUID, Integer, AffinityTopologyVersion> t = it.next(); + + clearContext(scMap.get(t), log); + + it.remove(); + } + } + + stopOldListeners(); + } + + /** + * Clear context. + * + * @param sc Supply context. + * @param log Logger. + * @return true in case context was removed. + */ + private static void clearContext( + final SupplyContext sc, + final IgniteLogger log) { + if (sc != null) { + final Iterator it = sc.entryIt; + + if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { + try { + ((GridCloseableIterator)it).close(); + } + catch (IgniteCheckedException e) { + log.error("Iterator close failed.", e); + } + } + + final GridDhtLocalPartition loc = sc.loc; + + if (loc != null) { + assert loc.reservations() > 0; + + loc.release(); + } + } + } + + /** + * Handles new topology. + * + * @param topVer Topology version. + */ + public void onTopologyChanged(AffinityTopologyVersion topVer) { + synchronized (scMap) { + Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator(); + + while (it.hasNext()) { + T3<UUID, Integer, AffinityTopologyVersion> t = it.next(); + + if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts. + clearContext(scMap.get(t), log); + + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Supply context removed [node=" + t.get1() + "]"); + } + } + } + } + + /** + * Sets preload predicate for supply pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { + this.preloadPred = preloadPred; + } + + /** + * @param d Demand message. + * @param idx Index. + * @param id Node uuid. + */ + public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { + assert d != null; + assert id != null; + + AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion demTop = d.topologyVersion(); + + T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop); + + if (d.updateSequence() == -1) {//Demand node requested context cleanup. + synchronized (scMap) { + clearContext(scMap.remove(scId), log); + + return; + } + } + + if (cutTop.compareTo(demTop) > 0) { + if (log.isDebugEnabled()) + log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop + + ", from=" + id + ", idx=" + idx + "]"); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop + + ", from=" + id + ", idx=" + idx + "]"); + + GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2( + d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + + ClusterNode node = cctx.discovery().node(id); + + if (node == null) + return; //Context will be cleaned at topology change. + + try { + SupplyContext sctx; + + synchronized (scMap) { + sctx = scMap.remove(scId); + + assert sctx == null || d.updateSequence() == sctx.updateSeq; + } + + // Initial demand request should contain partitions list. + if (sctx == null && d.partitions() == null) + return; + + assert !(sctx != null && d.partitions() != null); + + long bCnt = 0; + + SupplyContextPhase phase = SupplyContextPhase.NEW; + + boolean newReq = true; + + long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount(); + + if (sctx != null) { + phase = sctx.phase; + + maxBatchesCnt = 1; + } + else { + if (log.isDebugEnabled()) + log.debug("Starting supplying rebalancing [cache=" + cctx.name() + + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + + ", idx=" + idx + "]"); + } + + Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); + + while ((sctx != null && newReq) || partIt.hasNext()) { + int part = sctx != null && newReq ? sctx.part : partIt.next(); + + newReq = false; + + GridDhtLocalPartition loc; + + if (sctx != null && sctx.loc != null) { + loc = sctx.loc; + + assert loc.reservations() > 0; + } + else { + loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (phase == SupplyContextPhase.NEW && cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + if (phase == SupplyContextPhase.NEW) + phase = SupplyContextPhase.ONHEAP; + + if (phase == SupplyContextPhase.ONHEAP) { + Iterator<GridDhtCacheEntry> entIt = sctx != null ? + (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator(); + + while (entIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + entIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + swapLsnr = null; + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + } + } + + GridCacheEntryEx e = entIt.next(); + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + } + + if (phase == SupplyContextPhase.ONHEAP) { + phase = SupplyContextPhase.SWAP; + + if (sctx != null) { + sctx = new SupplyContext( + phase, + partIt, + null, + swapLsnr, + part, + loc, + d.updateSequence()); + } + } + + if (phase == SupplyContextPhase.SWAP && cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + sctx != null && sctx.entryIt != null ? + (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt : + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + boolean prepared = false; + + while (iter.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + iter, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + swapLsnr = null; + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + } + } + + Map.Entry<byte[], GridCacheSwapEntry> e = iter.next(); + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + iter.close(); + + if (partMissing) + continue; + } + } + + if (swapLsnr == null && sctx != null) + swapLsnr = sctx.swapLsnr; + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (phase == SupplyContextPhase.SWAP) { + phase = SupplyContextPhase.EVICTED; + + if (sctx != null) { + sctx = new SupplyContext( + phase, + partIt, + null, + null, + part, + loc, + d.updateSequence()); + } + } + + if (phase == SupplyContextPhase.EVICTED && swapLsnr != null) { + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); + + swapLsnr = null; + + Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ? + (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator(); + + while (lsnrIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + lsnrIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + } + } + + GridCacheEntryInfo info = lsnrIt.next(); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + phase = SupplyContextPhase.NEW; + + sctx = null; + } + finally { + if (loc != null) + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + reply(node, d, s, scId); + + if (log.isDebugEnabled()) + log.debug("Finished supplying rebalancing [cache=" + cctx.name() + + ", fromNode=" + node.id() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + + ", idx=" + idx + "]"); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + id, e); + } + catch (IgniteSpiException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + } + } + + /** + * @param n Node. + * @param d DemandMessage + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + private boolean reply(ClusterNode n, + GridDhtPartitionDemandMessage d, + GridDhtPartitionSupplyMessageV2 s, + T3<UUID, Integer, AffinityTopologyVersion> scId) + throws IgniteCheckedException { + + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + // Throttle preloading. + if (cctx.config().getRebalanceThrottle() > 0) + U.sleep(cctx.config().getRebalanceThrottle()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + synchronized (scMap) { + clearContext(scMap.remove(scId), log); + } + + return false; + } + } + + /** + * @param t Tuple. + * @param phase Phase. + * @param partIt Partition it. + * @param part Partition. + * @param entryIt Entry it. + * @param swapLsnr Swap listener. + */ + private void saveSupplyContext( + T3<UUID, Integer, AffinityTopologyVersion> t, + SupplyContextPhase phase, + Iterator<Integer> partIt, + int part, + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, + GridDhtLocalPartition loc, + AffinityTopologyVersion topVer, + long updateSeq) { + synchronized (scMap) { + if (cctx.affinity().affinityTopologyVersion().equals(topVer)) { + assert scMap.get(t) == null; + + scMap.put(t, + new SupplyContext(phase, + partIt, + entryIt, + swapLsnr, + part, + loc, + updateSeq)); + } + else if (loc != null) { + assert loc.reservations() > 0; + + loc.release(); + } + } + } + + /** + * Supply context phase. + */ + private enum SupplyContextPhase { + NEW, + ONHEAP, + SWAP, + EVICTED + } + + /** + * Supply context. + */ + private static class SupplyContext { + /** Phase. */ + private final SupplyContextPhase phase; + + /** Partition iterator. */ + private final Iterator<Integer> partIt; + + /** Entry iterator. */ + private final Iterator<?> entryIt; + + /** Swap listener. */ + private final GridCacheEntryInfoCollectSwapListener swapLsnr; + + /** Partition. */ + private final int part; + + /** Local partition. */ + private final GridDhtLocalPartition loc; + + /** Update seq. */ + private final long updateSeq; + + /** + * @param phase Phase. + * @param partIt Partition iterator. + * @param entryIt Entry iterator. + * @param swapLsnr Swap listener. + * @param part Partition. + */ + public SupplyContext(SupplyContextPhase phase, + Iterator<Integer> partIt, + Iterator<?> entryIt, + GridCacheEntryInfoCollectSwapListener swapLsnr, + int part, + GridDhtLocalPartition loc, + long updateSeq) { + this.phase = phase; + this.partIt = partIt; + this.entryIt = entryIt; + this.swapLsnr = swapLsnr; + this.part = part; + this.loc = loc; + this.updateSeq = updateSeq; + } + } + + @Deprecated//Backward compatibility. To be removed in future. + public void startOldListeners() { + if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processOldDemandMessage(m, id); + } + }); + } + } + + @Deprecated//Backward compatibility. To be removed in future. + public void stopOldListeners() { + if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { + + cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class); + } + } + + /** + * @param d D. + * @param id Id. + */ + @Deprecated//Backward compatibility. To be removed in future. + private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) { + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); + + ClusterNode node = cctx.node(id); + + long preloadThrottle = cctx.config().getRebalanceThrottle(); + + boolean ack = false; + + try { + for (int part : d.partitions()) { + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + for (GridCacheEntryEx e : loc.entries()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId(), cctx.deploymentEnabled()); + } + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + if (cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; + + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); + } + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + if (partMissing) + continue; + } + finally { + iter.close(); + } + } + } + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (swapLsnr != null) { + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); + + swapLsnr = null; + + for (GridCacheEntryInfo info : entries) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), + cctx.cacheId(), + cctx.deploymentEnabled()); + } + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + if (ack) { + s.markAck(); + + break; // Partition for loop. + } + } + finally { + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + replyOld(node, d, s); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + node.id(), e); + } + } + + /** + * @param n Node. + * @param d Demand message. + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + @Deprecated//Backward compatibility. To be removed in future. + private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + throws IgniteCheckedException { + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + return false; + } + } + + /** + * Dumps debug information. + */ + public void dumpDebugInfo() { + synchronized (scMap) { + if (!scMap.isEmpty()) { + U.warn(log, "Rebalancing supplier reserved following partitions:"); + + for (SupplyContext sc : scMap.values()) { + if (sc.loc != null) + U.warn(log, ">>> " + sc.loc); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java new file mode 100644 index 0000000..41454f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Partition supply message. + */ +public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Update sequence. */ + private long updateSeq; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Partitions that have been fully sent. */ + @GridDirectCollection(int.class) + private Collection<Integer> last; + + /** Partitions which were not found. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Collection<Integer> missed; + + /** Entries. */ + @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) + private Map<Integer, CacheEntryInfoCollection> infos; + + /** Message size. */ + @GridDirectTransient + private int msgSize; + + /** + * @param updateSeq Update sequence for this node. + * @param cacheId Cache ID. + * @param addDepInfo Deployment info flag. + */ + GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) { + this.cacheId = cacheId; + this.updateSeq = updateSeq; + this.topVer = topVer; + this.addDepInfo = addDepInfo; } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionSupplyMessageV2() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean ignoreClassErrors() { + return true; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * @return Topology version for which demand message is sent. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Flag to indicate last message for partition. + */ + Collection<Integer> last() { + return last == null ? Collections.<Integer>emptySet() : last; + } + + /** + * @param p Partition which was fully sent. + */ + void last(int p) { + if (last == null) + last = new HashSet<>(); + + if (last.add(p)) { + msgSize += 4; + + // If partition is empty, we need to add it. + if (!infos().containsKey(p)) { + CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection(); + + infoCol.init(); + + infos().put(p, infoCol); + } + } + } + + /** + * @param p Missed partition. + */ + void missed(int p) { + if (missed == null) + missed = new HashSet<>(); + + if (missed.add(p)) + msgSize += 4; + } + + /** + * @return Missed partitions. + */ + Collection<Integer> missed() { + return missed == null ? Collections.<Integer>emptySet() : missed; + } + + /** + * @return Entries. + */ + Map<Integer, CacheEntryInfoCollection> infos() { + if (infos == null) + infos = new HashMap<>(); + + return infos; + } + + /** + * @return Message size. + */ + int messageSize() { + return msgSize; + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos().get(p); + + if (infoCol == null) { + msgSize += 4; + + infos().put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + assert (info.key() != null || info.keyBytes() != null); + assert info.value() != null; + + // Need to call this method to initialize info properly. + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos().get(p); + + if (infoCol == null) { + msgSize += 4; + + infos().put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + + for (CacheEntryInfoCollection col : infos().values()) { + List<GridCacheEntryInfo> entries = col.infos(); + + for (int i = 0; i < entries.size(); i++) + entries.get(i).unmarshal(cacheCtx, ldr); + } + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** + * @return Number of entries in message. + */ + public int size() { + return infos().size(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + last = reader.readCollection("last", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + missed = reader.readCollection("missed", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 114; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 8; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionSupplyMessageV2.class, this, + "size", size(), + "parts", infos().keySet(), + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java deleted file mode 100644 index 28a73b1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.io.Externalizable; -import java.util.Collection; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.locks.ReadWriteLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; -import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.lang.GridCloseableIterator; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; - -/** - * Thread pool for supplying partitions to demanding nodes. - */ -class GridDhtPartitionSupplyPool { - /** */ - private final GridCacheContext<?, ?> cctx; - - /** */ - private final IgniteLogger log; - - /** */ - private final ReadWriteLock busyLock; - - /** */ - private GridDhtPartitionTopology top; - - /** */ - private final Collection<SupplyWorker> workers = new LinkedList<>(); - - /** */ - private final BlockingQueue<DemandMessage> queue = new LinkedBlockingDeque<>(); - - /** */ - private final boolean depEnabled; - - /** Preload predicate. */ - private IgnitePredicate<GridCacheEntryInfo> preloadPred; - - /** - * @param cctx Cache context. - * @param busyLock Shutdown lock. - */ - GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { - assert cctx != null; - assert busyLock != null; - - this.cctx = cctx; - this.busyLock = busyLock; - - log = cctx.logger(getClass()); - - top = cctx.dht().topology(); - - if (!cctx.kernalContext().clientNode()) { - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); - - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); - } - - depEnabled = cctx.gridDeploy().enabled(); - } - - /** - * - */ - void start() { - for (SupplyWorker w : workers) - new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start(); - } - - /** - * - */ - void stop() { - U.cancel(workers); - U.join(workers, log); - - top = null; - } - - /** - * Sets preload predicate for supply pool. - * - * @param preloadPred Preload predicate. - */ - void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { - this.preloadPred = preloadPred; - } - - /** - * @return Size of this thread pool. - */ - int poolSize() { - return cctx.config().getRebalanceThreadPoolSize(); - } - - /** - * @return {@code true} if entered to busy state. - */ - private boolean enterBusy() { - if (busyLock.readLock().tryLock()) - return true; - - if (log.isDebugEnabled()) - log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId()); - - return false; - } - - /** - * @param nodeId Sender node ID. - * @param d Message. - */ - private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) { - if (!enterBusy()) - return; - - try { - if (cctx.rebalanceEnabled()) { - if (log.isDebugEnabled()) - log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']'); - - queue.offer(new DemandMessage(nodeId, d)); - } - else - U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d); - } - finally { - leaveBusy(); - } - } - - /** - * - */ - private void leaveBusy() { - busyLock.readLock().unlock(); - } - - /** - * @param deque Deque to poll from. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(2000, MILLISECONDS); - } - - /** - * Supply work. - */ - private class SupplyWorker extends GridWorker { - /** Hide worker logger and use cache logger. */ - private IgniteLogger log = GridDhtPartitionSupplyPool.this.log; - - /** - * Default constructor. - */ - private SupplyWorker() { - super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - DemandMessage msg = poll(queue, this); - - if (msg == null) - continue; - - ClusterNode node = cctx.discovery().node(msg.senderId()); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Received message from non-existing node (will ignore): " + msg); - - continue; - } - - processMessage(msg, node); - } - } - - /** - * @param msg Message. - * @param node Demander. - */ - private void processMessage(DemandMessage msg, ClusterNode node) { - assert msg != null; - assert node != null; - - GridDhtPartitionDemandMessage d = msg.message(); - - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); - - long preloadThrottle = cctx.config().getRebalanceThrottle(); - - boolean ack = false; - - try { - for (int part : d.partitions()) { - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + msg.senderId() + ']'); - - continue; - } - - GridCacheEntryInfoCollectSwapListener swapLsnr = null; - - try { - if (cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); - - cctx.swap().addOffHeapListener(part, swapLsnr); - cctx.swap().addSwapListener(part, swapLsnr); - } - - boolean partMissing = false; - - for (GridCacheEntryEx e : loc.entries()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition [part=" + part + - ", nodeId=" + msg.senderId() + ']'); - - partMissing = true; - - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId(), cctx.deploymentEnabled()); - } - - GridCacheEntryInfo info = e.info(); - - if (info != null && !info.isNew()) { - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - if (partMissing) - continue; - - if (cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = - cctx.swap().iterator(part); - - // Iterator may be null if space does not exist. - if (iter != null) { - try { - boolean prepared = false; - - for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); - - partMissing = true; - - break; // For. - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId(), cctx.deploymentEnabled()); - } - - GridCacheSwapEntry swapEntry = e.getValue(); - - GridCacheEntryInfo info = new GridCacheEntryInfo(); - - info.keyBytes(e.getKey()); - info.ttl(swapEntry.ttl()); - info.expireTime(swapEntry.expireTime()); - info.version(swapEntry.version()); - info.value(swapEntry.value()); - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); - else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + - "cache entry): " + info); - - continue; - } - - // Need to manually prepare cache message. - if (depEnabled && !prepared) { - ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : - swapEntry.valueClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : - null; - - if (ldr == null) - continue; - - if (ldr instanceof GridDeploymentInfo) { - s.prepare((GridDeploymentInfo)ldr); - - prepared = true; - } - } - } - - if (partMissing) - continue; - } - finally { - iter.close(); - } - } - } - - // Stop receiving promote notifications. - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - - if (swapLsnr != null) { - Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); - - swapLsnr = null; - - for (GridCacheEntryInfo info : entries) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); - - // No need to continue iteration over swap entries. - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), - cctx.cacheId(), cctx.deploymentEnabled()); - } - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - // Mark as last supply message. - s.last(part); - - if (ack) { - s.markAck(); - - break; // Partition for loop. - } - } - finally { - loc.release(); - - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - } - } - - reply(node, d, s); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + node.id(), e); - } - } - - /** - * @param n Node. - * @param d Demand message. - * @param s Supply message. - * @return {@code True} if message was sent, {@code false} if recipient left grid. - * @throws IgniteCheckedException If failed. - */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) - throws IgniteCheckedException { - try { - if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); - - return true; - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + n.id()); - - return false; - } - } - } - - /** - * Demand message wrapper. - */ - private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param sndId Sender ID. - * @param msg Message. - */ - DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) { - super(sndId, msg); - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public DemandMessage() { - // No-op. - } - - /** - * @return Sender ID. - */ - UUID senderId() { - return get1(); - } - - /** - * @return Message. - */ - public GridDhtPartitionDemandMessage message() { - return get2(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']'; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index cef38e8..2f2944d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -742,6 +742,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT // Must initialize topology after we get discovery event. initTopology(cacheCtx); + cacheCtx.preloader().onTopologyChanged(exchId.topologyVersion()); + cacheCtx.preloader().updateLastExchangeFuture(this); }