http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e585b56..d5f2246 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -27,12 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheRebalanceMode; @@ -54,13 +50,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -71,12 +65,10 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.IgniteSpiException; import org.jetbrains.annotations.Nullable; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; -import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @@ -109,18 +101,6 @@ public class GridDhtPartitionDemander { /** Last exchange future. */ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; - /** Demand lock. */ - @Deprecated//Backward compatibility. To be removed in future. - private final ReadWriteLock demandLock; - - /** DemandWorker index. */ - @Deprecated//Backward compatibility. To be removed in future. - private final AtomicInteger dmIdx = new AtomicInteger(); - - /** DemandWorker. */ - @Deprecated//Backward compatibility. To be removed in future. - private volatile DemandWorker worker; - /** Cached rebalance topics. */ private final Map<Integer, Object> rebalanceTopics; @@ -138,13 +118,11 @@ public class GridDhtPartitionDemander { /** * @param cctx Cctx. - * @param demandLock Demand lock. */ - public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) { + public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) { assert cctx != null; this.cctx = cctx; - this.demandLock = demandLock; log = cctx.logger(getClass()); @@ -184,11 +162,6 @@ public class GridDhtPartitionDemander { rebalanceFut.onDone(false); } - DemandWorker dw = worker; - - if (dw != null) - dw.cancel(); - lastExchangeFut = null; lastTimeoutObj.set(null); @@ -466,65 +439,47 @@ public class GridDhtPartitionDemander { GridDhtPartitionDemandMessage d = e.getValue(); - //Check remote node rebalancing API version. - if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { - U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + - ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - - int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); + U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); - List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); + int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); - for (int cnt = 0; cnt < lsnrCnt; cnt++) - sParts.add(new HashSet<Integer>()); + List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); - Iterator<Integer> it = parts.iterator(); + for (int cnt = 0; cnt < lsnrCnt; cnt++) + sParts.add(new HashSet<Integer>()); - int cnt = 0; + Iterator<Integer> it = parts.iterator(); - while (it.hasNext()) - sParts.get(cnt++ % lsnrCnt).add(it.next()); + int cnt = 0; - for (cnt = 0; cnt < lsnrCnt; cnt++) { - if (!sParts.get(cnt).isEmpty()) { - // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + while (it.hasNext()) + sParts.get(cnt++ % lsnrCnt).add(it.next()); - initD.topic(rebalanceTopics.get(cnt)); - initD.updateSequence(fut.updateSeq); - initD.timeout(cctx.config().getRebalanceTimeout()); - - synchronized (fut) { - if (!fut.isDone()) { - // Future can be already cancelled at this moment and all failovers happened. - // New requests will not be covered by failovers. - cctx.io().sendOrderedMessage(node, - rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); - } + for (cnt = 0; cnt < lsnrCnt; cnt++) { + if (!sParts.get(cnt).isEmpty()) { + // Create copy. + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + + initD.topic(rebalanceTopics.get(cnt)); + initD.updateSequence(fut.updateSeq); + initD.timeout(cctx.config().getRebalanceTimeout()); + + synchronized (fut) { + if (!fut.isDone()) { + // Future can be already cancelled at this moment and all failovers happened. + // New requests will not be covered by failovers. + cctx.io().sendOrderedMessage(node, + rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); } - - if (log.isDebugEnabled()) - log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + - cnt + ", partitions count=" + sParts.get(cnt).size() + - " (" + partitionsList(sParts.get(cnt)) + ")]"); } - } - } - else { - U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + - ", mode=" + cfg.getRebalanceMode() + - ", fromNode=" + node.id() + - ", partitionsCount=" + parts.size() + - ", topology=" + fut.topologyVersion() + - ", updateSeq=" + fut.updateSeq + "]"); - d.timeout(cctx.config().getRebalanceTimeout()); - d.workerId(0);//old api support. - - worker = new DemandWorker(dmIdx.incrementAndGet(), fut); - - worker.run(node, d); + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + cnt + ", partitions count=" + sParts.get(cnt).size() + + " (" + partitionsList(sParts.get(cnt)) + ")]"); + } } } } @@ -997,26 +952,23 @@ public class GridDhtPartitionDemander { if (node == null) return; - //Check remote node rebalancing API version. - if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { - GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( - -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); + GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( + -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); - d.timeout(cctx.config().getRebalanceTimeout()); + d.timeout(cctx.config().getRebalanceTimeout()); - try { - for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { - d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); + try { + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); - cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), - d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); - } - } - catch (IgniteCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send failover context cleanup request to node"); + cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); } } + catch (IgniteCheckedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to send failover context cleanup request to node"); + } } /** @@ -1147,373 +1099,4 @@ public class GridDhtPartitionDemander { return S.toString(RebalanceFuture.class, this); } } - - /** - * Supply message wrapper. - */ - @Deprecated//Backward compatibility. To be removed in future. - private static class SupplyMessage { - /** Sender ID. */ - private UUID sndId; - - /** Supply message. */ - private GridDhtPartitionSupplyMessage supply; - - /** - * Dummy constructor. - */ - private SupplyMessage() { - // No-op. - } - - /** - * @param sndId Sender ID. - * @param supply Supply message. - */ - SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { - this.sndId = sndId; - this.supply = supply; - } - - /** - * @return Sender ID. - */ - UUID senderId() { - return sndId; - } - - /** - * @return Message. - */ - GridDhtPartitionSupplyMessage supply() { - return supply; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SupplyMessage.class, this); - } - } - - /** - * - */ - @Deprecated//Backward compatibility. To be removed in future. - private class DemandWorker { - /** Worker ID. */ - private int id; - - /** Partition-to-node assignments. */ - private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); - - /** Message queue. */ - private final LinkedBlockingDeque<SupplyMessage> msgQ = - new LinkedBlockingDeque<>(); - - /** Counter. */ - private long cntr; - - /** Hide worker logger and use cache logger instead. */ - private IgniteLogger log = GridDhtPartitionDemander.this.log; - - /** */ - private volatile RebalanceFuture fut; - - /** - * @param id Worker ID. - * @param fut Rebalance future. - */ - private DemandWorker(int id, RebalanceFuture fut) { - assert id >= 0; - - this.id = id; - this.fut = fut; - } - - /** - * @param msg Message. - */ - private void addMessage(SupplyMessage msg) { - msgQ.offer(msg); - } - - /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException { - return deque.poll(time, MILLISECONDS); - } - - /** - * @param idx Unique index for this topic. - * @return Topic for partition. - */ - public Object topic(long idx) { - return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); - } - - /** */ - public void cancel() { - msgQ.clear(); - - msgQ.offer(new SupplyMessage(null, null)); - } - - /** - * @param node Node to demand from. - * @param topVer Topology version. - * @param d Demand message. - * @param exchFut Exchange future. - * @throws InterruptedException If interrupted. - * @throws ClusterTopologyCheckedException If node left. - * @throws IgniteCheckedException If failed to send message. - */ - private void demandFromNode( - ClusterNode node, - final AffinityTopologyVersion topVer, - GridDhtPartitionDemandMessage d, - GridDhtPartitionsExchangeFuture exchFut - ) throws InterruptedException, IgniteCheckedException { - GridDhtPartitionTopology top = cctx.dht().topology(); - - cntr++; - - d.topic(topic(cntr)); - d.workerId(id); - - if (fut.isDone() || topologyChanged(fut)) - return; - - cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { - addMessage(new SupplyMessage(nodeId, msg)); - } - }); - - try { - boolean retry; - - // DoWhile. - // ======= - do { - retry = false; - - // Create copy. - d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); - - long timeout = cctx.config().getRebalanceTimeout(); - - d.timeout(timeout); - - if (log.isDebugEnabled()) - log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']'); - - // Send demand message. - cctx.io().send(node, d, cctx.ioPolicy()); - - // While. - // ===== - while (!fut.isDone() && !topologyChanged(fut)) { - SupplyMessage s = poll(msgQ, timeout); - - // If timed out. - if (s == null) { - if (msgQ.isEmpty()) { // Safety check. - U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout + - " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" + - " configuration properties)."); - - // Ordered listener was removed if timeout expired. - cctx.io().removeOrderedHandler(d.topic()); - - // Must create copy to be able to work with IO manager thread local caches. - d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); - - // Create new topic. - d.topic(topic(++cntr)); - - // Create new ordered listener. - cctx.io().addOrderedHandler(d.topic(), - new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(UUID nodeId, - GridDhtPartitionSupplyMessage msg) { - addMessage(new SupplyMessage(nodeId, msg)); - } - }); - - // Resend message with larger timeout. - retry = true; - - break; // While. - } - else - continue; // While. - } - - if (s.senderId() == null) - return; // Stopping now. - - // Check that message was received from expected node. - if (!s.senderId().equals(node.id())) { - U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + - ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); - - continue; // While. - } - - if (log.isDebugEnabled()) - log.debug("Received supply message: " + s); - - GridDhtPartitionSupplyMessage supply = s.supply(); - - // Check whether there were class loading errors on unmarshal - if (supply.classError() != null) { - if (log.isDebugEnabled()) - log.debug("Class got undeployed during preloading: " + supply.classError()); - - retry = true; - - // Quit preloading. - break; - } - - // Preload. - for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { - int p = e.getKey(); - - if (cctx.affinity().partitionLocalNode(p, topVer)) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); - - assert part != null; - - if (part.state() == MOVING) { - boolean reserved = part.reserve(); - - assert reserved : "Failed to reserve partition [igniteInstanceName=" + - cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() + - ", part=" + part + ']'; - - part.lock(); - - try { - Collection<Integer> invalidParts = new GridLeanSet<>(); - - // Loop through all received entries and try to preload them. - for (GridCacheEntryInfo entry : e.getValue().infos()) { - if (!invalidParts.contains(p)) { - if (!part.preloadingPermitted(entry.key(), entry.version())) { - if (log.isDebugEnabled()) - log.debug("Preloading is not permitted for entry due to " + - "evictions [key=" + entry.key() + - ", ver=" + entry.version() + ']'); - - continue; - } - - if (!preloadEntry(node, p, entry, topVer)) { - invalidParts.add(p); - - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - } - } - } - - boolean last = supply.last().contains(p); - - // If message was last for this partition, - // then we take ownership. - if (last) { - fut.partitionDone(node.id(), p); - - top.own(part); - - if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + part); - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); - } - } - finally { - part.unlock(); - part.release(); - } - } - else { - fut.partitionDone(node.id(), p); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (state is not MOVING): " + part); - } - } - else { - fut.partitionDone(node.id(), p); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); - } - } - - // Only request partitions based on latest topology version. - for (Integer miss : s.supply().missed()) { - if (cctx.affinity().partitionLocalNode(miss, topVer)) - fut.partitionMissed(node.id(), miss); - } - - for (Integer miss : s.supply().missed()) - fut.partitionDone(node.id(), miss); - - if (fut.remaining.get(node.id()) == null) - break; // While. - - if (s.supply().ack()) { - retry = true; - - break; - } - } - } - while (retry && !fut.isDone() && !topologyChanged(fut)); - } - finally { - cctx.io().removeOrderedHandler(d.topic()); - } - } - - /** - * @param node Node. - * @param d D. - * @throws IgniteCheckedException If failed. - */ - public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException { - demandLock.readLock().lock(); - - try { - GridDhtPartitionsExchangeFuture exchFut = fut.exchFut; - - AffinityTopologyVersion topVer = fut.topVer; - - try { - demandFromNode(node, topVer, d, exchFut); - } - catch (InterruptedException e) { - throw new IgniteCheckedException(e); - } - } - finally { - demandLock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString()); - } - } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java index e8860f2..27e6777 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java @@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** * Full partition map. */ -public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> +public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap> implements Comparable<GridDhtPartitionFullMap>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -65,32 +65,9 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> * @param nodeOrder Node order. * @param updateSeq Update sequence number. * @param m Map to copy. - */ - @Deprecated // Backward compatibility. - public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m) { - assert nodeId != null; - assert updateSeq > 0; - assert nodeOrder > 0; - - this.nodeId = nodeId; - this.nodeOrder = nodeOrder; - this.updateSeq = updateSeq; - - for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) { - GridDhtPartitionMap2 part = e.getValue(); - - put(e.getKey(), new GridDhtPartitionMap(part.nodeId(), part.updateSequence(), part.map())); - } - } - - /** - * @param nodeId Node ID. - * @param nodeOrder Node order. - * @param updateSeq Update sequence number. - * @param m Map to copy. * @param onlyActive If {@code true}, then only active partitions will be included. */ - public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m, + public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap> m, boolean onlyActive) { assert nodeId != null; assert updateSeq > 0; @@ -100,10 +77,10 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> this.nodeOrder = nodeOrder; this.updateSeq = updateSeq; - for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) { - GridDhtPartitionMap2 part = e.getValue(); + for (Map.Entry<UUID, GridDhtPartitionMap> e : m.entrySet()) { + GridDhtPartitionMap part = e.getValue(); - GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(), + GridDhtPartitionMap cpy = new GridDhtPartitionMap(part.nodeId(), part.updateSequence(), part.topologyVersion(), part.map(), @@ -168,8 +145,8 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> if (size() != fullMap.size()) return false; - for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) { - GridDhtPartitionMap2 m = fullMap.get(e.getKey()); + for (Map.Entry<UUID, GridDhtPartitionMap> e : entrySet()) { + GridDhtPartitionMap m = fullMap.get(e.getKey()); if (m == null || !m.map().equals(e.getValue().map())) return false; @@ -238,7 +215,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> nodeOrder = in.readLong(); updateSeq = in.readLong(); - putAll(U.<UUID, GridDhtPartitionMap2>readMap(in)); + putAll(U.<UUID, GridDhtPartitionMap>readMap(in)); } /** {@inheritDoc} */ @@ -260,7 +237,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> * @return Map string representation. */ public String map2string() { - Iterator<Map.Entry<UUID, GridDhtPartitionMap2>> it = entrySet().iterator(); + Iterator<Map.Entry<UUID, GridDhtPartitionMap>> it = entrySet().iterator(); if (!it.hasNext()) return "{}"; @@ -270,11 +247,11 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2> buf.append('{'); while(true) { - Map.Entry<UUID, GridDhtPartitionMap2> e = it.next(); + Map.Entry<UUID, GridDhtPartitionMap> e = it.next(); UUID nodeId = e.getKey(); - GridDhtPartitionMap2 partMap = e.getValue(); + GridDhtPartitionMap partMap = e.getValue(); buf.append(nodeId).append('=').append(partMap.toFullString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index 3096d63..43087ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -22,46 +22,215 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Map; +import java.util.Set; import java.util.UUID; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; + /** * Partition map. */ -@Deprecated // Backward compatibility, use GridDhtPartitionMap2 instead. -public class GridDhtPartitionMap extends GridDhtPartitionMap2 { +public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Externalizable { /** */ private static final long serialVersionUID = 0L; + /** Node ID. */ + protected UUID nodeId; + + /** Update sequence number. */ + protected long updateSeq; + + /** Topology version. */ + protected AffinityTopologyVersion top; + + /** */ + protected Map<Integer, GridDhtPartitionState> map; + + /** */ + private volatile int moving; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionMap() { + // No-op. + } + /** * @param nodeId Node ID. * @param updateSeq Update sequence number. + * @param top Topology version. * @param m Map to copy. + * @param onlyActive If {@code true}, then only active states will be included. */ - public GridDhtPartitionMap(UUID nodeId, long updateSeq, - Map<Integer, GridDhtPartitionState> m) { + public GridDhtPartitionMap(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> m, + boolean onlyActive) { assert nodeId != null; assert updateSeq > 0; this.nodeId = nodeId; this.updateSeq = updateSeq; + this.top = top; map = U.newHashMap(m.size()); for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) { GridDhtPartitionState state = e.getValue(); - put(e.getKey(), state); + if (!onlyActive || state.active()) + put(e.getKey(), state); } } /** - * Empty constructor required for {@link Externalizable}. + * @param nodeId Node ID. + * @param updateSeq Update sequence number. + * @param top Topology version. + * @param map Map. + * @param moving Number of moving partitions. */ - public GridDhtPartitionMap() { - // No-op. + private GridDhtPartitionMap(UUID nodeId, + long updateSeq, + AffinityTopologyVersion top, + Map<Integer, GridDhtPartitionState> map, + int moving) { + this.nodeId = nodeId; + this.updateSeq = updateSeq; + this.top = top; + this.map = map; + this.moving = moving; + } + + /** + * @return Copy with empty partition state map. + */ + public GridDhtPartitionMap emptyCopy() { + return new GridDhtPartitionMap(nodeId, + updateSeq, + top, + U.<Integer, GridDhtPartitionState>newHashMap(0), + 0); + } + + /** + * @param part Partition. + * @param state Partition state. + */ + public void put(Integer part, GridDhtPartitionState state) { + GridDhtPartitionState old = map.put(part, state); + + if (old == MOVING) + moving--; + + if (state == MOVING) + moving++; + } + + /** + * @return {@code true} If partition map contains moving partitions. + */ + public boolean hasMovingPartitions() { + assert moving >= 0 : moving; + + return moving != 0; + } + + /** + * @param part Partition. + * @return Partition state. + */ + public GridDhtPartitionState get(Integer part) { + return map.get(part); + } + + /** + * @param part Partition. + * @return {@code True} if contains given partition. + */ + public boolean containsKey(Integer part) { + return map.containsKey(part); + } + + /** + * @return Entries. + */ + public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() { + return map.entrySet(); + } + + /** + * @return Map size. + */ + public int size() { + return map.size(); + } + + /** + * @return Partitions. + */ + public Set<Integer> keySet() { + return map.keySet(); + } + + /** + * @return Underlying map. + */ + public Map<Integer, GridDhtPartitionState> map() { + return map; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Update sequence. + */ + public long updateSequence() { + return updateSeq; + } + + /** + * @param updateSeq New update sequence value. + * @param topVer Current topology version. + * @return Old update sequence value. + */ + public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) { + long old = this.updateSeq; + + assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']'; + + this.updateSeq = updateSeq; + + top = topVer; + + return old; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return top; + } + + /** {@inheritDoc} */ + @Override public int compareTo(GridDhtPartitionMap o) { + assert nodeId.equals(o.nodeId); + + return Long.compare(updateSeq, o.updateSeq); } /** {@inheritDoc} */ @@ -80,7 +249,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 { int ordinal = entry.getValue().ordinal(); assert ordinal == (ordinal & 0x3); - assert entry.getKey() == (entry.getKey() & 0x3FFF); + assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey(); int coded = (ordinal << 14) | entry.getKey(); @@ -90,6 +259,15 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 { } assert i == size; + + if (top != null) { + out.writeLong(topologyVersion().topologyVersion()); + out.writeInt(topologyVersion().minorTopologyVersion()); + } + else { + out.writeLong(0); + out.writeInt(0); + } } /** {@inheritDoc} */ @@ -110,6 +288,12 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 { put(part, GridDhtPartitionState.fromOrdinal(ordinal)); } + + long ver = in.readLong(); + int minorVer = in.readInt(); + + if (ver != 0) + top = new AffinityTopologyVersion(ver, minorVer); } /** {@inheritDoc} */ @@ -117,7 +301,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 { if (this == o) return true; - GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o; + GridDhtPartitionMap other = (GridDhtPartitionMap)o; return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq; } @@ -131,11 +315,11 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 { * @return Full string representation. */ public String toFullString() { - return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString()); + return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString(), "top", top); } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtPartitionMap2.class, this, "size", size()); + return S.toString(GridDhtPartitionMap.class, this, "size", size()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java deleted file mode 100644 index 7d6f272..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteProductVersion; - -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; - -/** - * Partition map. - */ -public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Used since. */ - public static final IgniteProductVersion SINCE = IgniteProductVersion.fromString("1.5.0"); - - /** Node ID. */ - protected UUID nodeId; - - /** Update sequence number. */ - protected long updateSeq; - - /** Topology version. */ - protected AffinityTopologyVersion top; - - /** */ - protected Map<Integer, GridDhtPartitionState> map; - - /** */ - private volatile int moving; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridDhtPartitionMap2() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param updateSeq Update sequence number. - * @param top Topology version. - * @param m Map to copy. - * @param onlyActive If {@code true}, then only active states will be included. - */ - public GridDhtPartitionMap2(UUID nodeId, - long updateSeq, - AffinityTopologyVersion top, - Map<Integer, GridDhtPartitionState> m, - boolean onlyActive) { - assert nodeId != null; - assert updateSeq > 0; - - this.nodeId = nodeId; - this.updateSeq = updateSeq; - this.top = top; - - map = U.newHashMap(m.size()); - - for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) { - GridDhtPartitionState state = e.getValue(); - - if (!onlyActive || state.active()) - put(e.getKey(), state); - } - } - - /** - * @param nodeId Node ID. - * @param updateSeq Update sequence number. - * @param top Topology version. - * @param map Map. - * @param moving Number of moving partitions. - */ - private GridDhtPartitionMap2(UUID nodeId, - long updateSeq, - AffinityTopologyVersion top, - Map<Integer, GridDhtPartitionState> map, - int moving) { - this.nodeId = nodeId; - this.updateSeq = updateSeq; - this.top = top; - this.map = map; - this.moving = moving; - } - - /** - * @return Copy with empty partition state map. - */ - public GridDhtPartitionMap2 emptyCopy() { - return new GridDhtPartitionMap2(nodeId, - updateSeq, - top, - U.<Integer, GridDhtPartitionState>newHashMap(0), - 0); - } - - /** - * @param part Partition. - * @param state Partition state. - */ - public void put(Integer part, GridDhtPartitionState state) { - GridDhtPartitionState old = map.put(part, state); - - if (old == MOVING) - moving--; - - if (state == MOVING) - moving++; - } - - /** - * @return {@code true} If partition map contains moving partitions. - */ - public boolean hasMovingPartitions() { - assert moving >= 0 : moving; - - return moving != 0; - } - - /** - * @param part Partition. - * @return Partition state. - */ - public GridDhtPartitionState get(Integer part) { - return map.get(part); - } - - /** - * @param part Partition. - * @return {@code True} if contains given partition. - */ - public boolean containsKey(Integer part) { - return map.containsKey(part); - } - - /** - * @return Entries. - */ - public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() { - return map.entrySet(); - } - - /** - * @return Map size. - */ - public int size() { - return map.size(); - } - - /** - * @return Partitions. - */ - public Set<Integer> keySet() { - return map.keySet(); - } - - /** - * @return Underlying map. - */ - public Map<Integer, GridDhtPartitionState> map() { - return map; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Update sequence. - */ - public long updateSequence() { - return updateSeq; - } - - /** - * @param updateSeq New update sequence value. - * @param topVer Current topology version. - * @return Old update sequence value. - */ - public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) { - long old = this.updateSeq; - - assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']'; - - this.updateSeq = updateSeq; - - top = topVer; - - return old; - } - - /** - * @return Topology version. - */ - public AffinityTopologyVersion topologyVersion() { - return top; - } - - /** {@inheritDoc} */ - @Override public int compareTo(GridDhtPartitionMap2 o) { - assert nodeId.equals(o.nodeId); - - return Long.compare(updateSeq, o.updateSeq); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, nodeId); - - out.writeLong(updateSeq); - - int size = map.size(); - - out.writeInt(size); - - int i = 0; - - for (Map.Entry<Integer, GridDhtPartitionState> entry : map.entrySet()) { - int ordinal = entry.getValue().ordinal(); - - assert ordinal == (ordinal & 0x3); - assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey(); - - int coded = (ordinal << 14) | entry.getKey(); - - out.writeShort((short)coded); - - i++; - } - - assert i == size; - - if (top != null) { - out.writeLong(topologyVersion().topologyVersion()); - out.writeInt(topologyVersion().minorTopologyVersion()); - } - else { - out.writeLong(0); - out.writeInt(0); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeId = U.readUuid(in); - - updateSeq = in.readLong(); - - int size = in.readInt(); - - map = U.newHashMap(size); - - for (int i = 0; i < size; i++) { - int entry = in.readShort() & 0xFFFF; - - int part = entry & 0x3FFF; - int ordinal = entry >> 14; - - put(part, GridDhtPartitionState.fromOrdinal(ordinal)); - } - - long ver = in.readLong(); - int minorVer = in.readInt(); - - if (ver != 0) - top = new AffinityTopologyVersion(ver, minorVer); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o; - - return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return 31 * nodeId.hashCode() + (int)(updateSeq ^ (updateSeq >>> 32)); - } - - /** - * @return Full string representation. - */ - public String toFullString() { - return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString(), "top", top); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtPartitionMap2.class, this, "size", size()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 6e69161..1f3dee7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -32,9 +31,6 @@ import org.jetbrains.annotations.Nullable; */ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { /** */ - public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11"); - - /** */ protected static final byte COMPRESSED_FLAG_MASK = 1; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5eacc36..f41da2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1476,7 +1476,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param msg Partitions single message. */ private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) { - for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 8a7adfc..33c23e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -241,13 +241,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa assert map2 != null : e.getValue(); assert map1.size() == map2.size(); - for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) { - GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey()); + for (Map.Entry<UUID, GridDhtPartitionMap> e0 : map2.entrySet()) { + GridDhtPartitionMap partMap1 = map1.get(e0.getKey()); assert partMap1 != null && partMap1.map().isEmpty() : partMap1; assert !partMap1.hasMovingPartitions() : partMap1; - GridDhtPartitionMap2 partMap2 = e0.getValue(); + GridDhtPartitionMap partMap2 = e0.getValue(); assert partMap2 != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index d65e405..da7403e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -47,7 +47,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Local partitions. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, GridDhtPartitionMap2> parts; + private Map<Integer, GridDhtPartitionMap> parts; /** */ @GridDirectMap(keyType = Integer.class, valueType = Integer.class) @@ -106,7 +106,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param locMap Local partition map. * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) { + public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap, @Nullable Integer dupDataCache) { if (parts == null) parts = new HashMap<>(); @@ -152,7 +152,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** * @return Local partitions. */ - public Map<Integer, GridDhtPartitionMap2> partitions() { + public Map<Integer, GridDhtPartitionMap> partitions() { return parts; } @@ -217,13 +217,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes assert parts != null; for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) { - GridDhtPartitionMap2 map1 = parts.get(e.getKey()); + GridDhtPartitionMap map1 = parts.get(e.getKey()); assert map1 != null : e.getKey(); assert F.isEmpty(map1.map()); assert !map1.hasMovingPartitions(); - GridDhtPartitionMap2 map2 = parts.get(e.getValue()); + GridDhtPartitionMap map2 = parts.get(e.getValue()); assert map2 != null : e.getValue(); assert map2.map() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 1d88742..dc988bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -62,7 +61,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -80,13 +78,6 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; * DHT cache preloader. */ public class GridDhtPreloader extends GridCachePreloaderAdapter { - /** - * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved. - * Node automatically chose communication protocol depends on remote node's version. - * Backward compatibility may be removed at Ignite 2.x. - */ - public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0"); - /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; @@ -194,7 +185,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { cctx.shared().affinity().onCacheCreated(cctx); supplier = new GridDhtPartitionSupplier(cctx); - demander = new GridDhtPartitionDemander(cctx, demandLock); + demander = new GridDhtPartitionDemander(cctx); supplier.start(); demander.start(); @@ -619,14 +610,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { AffinityAssignment assignment = cctx.affinity().assignment(topVer); - boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0; - GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(), topVer, - assignment.assignment(), - newAffMode); + assignment.assignment()); - if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) { + if (cctx.affinity().affinityCache().centralizedAffinityFunction()) { assert assignment.idealAssignment() != null; res.idealAffinityAssignment(assignment.idealAssignment()); http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java index 4dd7978..76147ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -34,9 +33,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; */ public class CacheContinuousQueryBatchAck extends GridCacheMessage { /** */ - public static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.0"); - - /** */ private static final long serialVersionUID = 0L; /** Routine ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 485059f..6c8df14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -1324,7 +1324,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler for (AffinityTopologyVersion topVer : t.get2()) { for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) { - if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) { + if (!node.isLocal()) { try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 6887a50..745bbde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -58,14 +58,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; -import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteAsyncCallback; -import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; @@ -78,7 +77,6 @@ import static javax.cache.event.EventType.REMOVED; import static javax.cache.event.EventType.UPDATED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; -import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE; /** * Continuous queries manager. @@ -424,49 +422,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { final boolean keepBinary, final boolean includeExpired) throws IgniteCheckedException { - IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr; + IgniteOutClosure<CacheContinuousQueryHandler> clsr; if (rmtFilterFactory != null) - clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean v2) { + clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply() { CacheContinuousQueryHandler hnd; - if (v2) - hnd = new CacheContinuousQueryHandlerV2( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - rmtFilterFactory, - true, - false, - !includeExpired, - false, - null); - else { - CacheEntryEventFilter fltr = rmtFilterFactory.create(); - - if (!(fltr instanceof CacheEntryEventSerializableFilter)) - throw new IgniteException("Topology has nodes of the old versions. In this case " + - "EntryEventFilter should implement " + - "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr); - - hnd = new CacheContinuousQueryHandler( - cctx.name(), - TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), - locLsnr, - (CacheEntryEventSerializableFilter)fltr, - true, - false, - !includeExpired, - false); - } + hnd = new CacheContinuousQueryHandlerV2( + cctx.name(), + TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), + locLsnr, + rmtFilterFactory, + true, + false, + !includeExpired, + false, + null); return hnd; } }; else - clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean ignore) { + clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply() { return new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -509,8 +488,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { { return executeQuery0( locLsnr, - new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean v2) { + new IgniteOutClosure<CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply() { return new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -603,18 +582,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * @param locLsnr Local listener. + * @param clsr Closure to create CacheContinuousQueryHandler. * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. * @param internal Internal flag. * @param notifyExisting Notify existing flag. * @param loc Local flag. + * @param keepBinary Keep binary flag. * @param onStart Waiting topology exchange. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, - IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr, + IgniteOutClosure<CacheContinuousQueryHandler> clsr, int bufSize, long timeInterval, boolean autoUnsubscribe, @@ -631,9 +612,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); - boolean v2 = useV2Protocol(cctx.discovery().allNodes()); - - final CacheContinuousQueryHandler hnd = clsr.apply(v2); + final CacheContinuousQueryHandler hnd = clsr.apply(); hnd.taskNameHash(taskNameHash); hnd.skipPrimaryCheck(skipPrimaryCheck); @@ -799,20 +778,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * @param nodes Nodes. - * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE}, - * otherwise {@code false}. - */ - private boolean useV2Protocol(Collection<ClusterNode> nodes) { - for (ClusterNode node : nodes) { - if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0) - return false; - } - - return true; - } - - /** * @param lsnrId Listener ID. * @param lsnr Listener. * @param internal Internal flag. @@ -922,14 +887,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { routineId = executeQuery0( locLsnr, - new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean v2) { + new IgniteOutClosure<CacheContinuousQueryHandler>() { + @Override public CacheContinuousQueryHandler apply() { CacheContinuousQueryHandler hnd; Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory(); - v2 = rmtFilterFactory != null && v2; - - if (v2) + if (rmtFilterFactory != null) hnd = new CacheContinuousQueryHandlerV2( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index ca4edb6..3814731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -37,7 +37,6 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.cache.store.CacheStoreSessionListener; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@ -67,7 +66,6 @@ import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.transactions.Transaction; @@ -87,13 +85,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** */ private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key(); - /** - * Behavior can be changed by setting {@link IgniteSystemProperties#IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY} property - * to {@code True}. - */ - private static final IgniteProductVersion LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE = - IgniteProductVersion.fromString("1.5.22"); - /** */ protected CacheStore<Object, Object> store; @@ -237,22 +228,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt globalSesLsnrs = true; } - - if (isLocal()) { - for (ClusterNode node : cctx.kernalContext().cluster().get().forRemotes().nodes()) { - if (LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE.compareTo(node.version()) > 0 && - !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY)) { - IgniteProductVersion v = LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE; - - log.warning("Since Ignite " + v.major() + "." + v.minor() + "." + v.maintenance() + - " Local Store keeps primary and backup partitions. " + - "To keep primary partitions only please set system property " + - IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY + " to 'true'."); - - break; - } - } - } } /** {@inheritDoc} */
