http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ec9dbbf..791bac0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -40,9 +40,9 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.ClusterState; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -68,7 +68,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Partition topology. */ -@GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +@GridToStringExclude +public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -78,8 +79,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** */ private static final Long ZERO = 0L; - /** Context. */ - private final GridCacheContext<?, ?> cctx; + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final CacheGroupContext grp; /** Logger. */ private final IgniteLogger log; @@ -114,9 +118,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** Lock. */ private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); - /** */ - private final GridCacheMapEntryFactory entryFactory; - /** Partition update counter. */ private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>(); @@ -127,25 +128,27 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private volatile boolean treatAllPartAsLoc; /** - * @param cctx Context. - * @param entryFactory Entry factory. + * @param ctx Cache shared context. + * @param grp Cache group. */ - GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) { - assert cctx != null; + public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, + CacheGroupContext grp) { + assert ctx != null; + assert grp != null; - this.cctx = cctx; - this.entryFactory = entryFactory; + this.ctx = ctx; + this.grp = grp; - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); + locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); - part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f); + part2node = new HashMap<>(grp.affinityFunction().partitions(), 1.0f); } /** {@inheritDoc} */ - @Override public int cacheId() { - return cctx.cacheId(); + @Override public int groupId() { + return grp.groupId(); } /** @@ -169,7 +172,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh topVer = AffinityTopologyVersion.NONE; - discoCache = cctx.discovery().discoCache(); + discoCache = ctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -240,7 +243,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh AffinityTopologyVersion topVer = this.topVer; assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", cacheName=" + cctx.name() + ']'; + ", group=" + grp.cacheOrGroupName() + ']'; return topVer; } @@ -282,7 +285,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); @@ -290,23 +293,23 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology [topVer=" + topVer + - ", cache=" + cctx.name() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) : - "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() + - ", cache=" + cctx.name() + + assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion()); + List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); - if (cctx.rebalanceEnabled()) { - boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()); + if (grp.rebalanceEnabled()) { + boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added; + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); if (first) { assert exchId.isJoined() || added; @@ -317,7 +320,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + + assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'; if (log.isDebugEnabled()) @@ -376,7 +379,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @param updateSeq Update sequence. */ private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); for (int p = 0; p < num; p++) { if (node2part != null && node2part.valid()) { @@ -398,24 +401,23 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady) throws IgniteCheckedException { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); ClusterState newState = exchFut.newClusterState(); treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) - || (cctx.kernalContext().state().active() + || (ctx.kernalContext().state().active() && discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() - && !cctx.kernalContext().clientNode() + && !ctx.kernalContext().clientNode() ); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { - synchronized (cctx.shared().exchange().interruptLock()) { + synchronized (ctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); @@ -443,7 +445,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh cntrMap.clear(); // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) { + if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -470,7 +472,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (affReady) initPartitions0(exchFut, updateSeq); else { - List<List<ClusterNode>> aff = cctx.affinity().idealAssignment(); + List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); createPartitions(aff, updateSeq); } @@ -487,23 +489,32 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } + /** + * @param p Partition number. + * @param topVer Topology version. + * @return {@code True} if given partition belongs to local node. + */ + private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { + return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { treatAllPartAsLoc = false; boolean changed = false; - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); - assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " + + assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + "[topVer=" + topVer + - ", affVer=" + cctx.affinity().affinityTopologyVersion() + + ", affVer=" + grp.affinity().lastVersion() + ", fut=" + exchFut + ']'; lock.writeLock().lock(); @@ -524,7 +535,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@ -537,26 +548,28 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh GridDhtPartitionState state = locPart.state(); if (state == MOVING) { - if (cctx.rebalanceEnabled()) { + if (grp.rebalanceEnabled()) { Collection<ClusterNode> owners = owners(p); // If there are no other owners, then become an owner. if (F.isEmpty(owners)) { boolean owned = locPart.own(); - assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + + assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" + locPart + ']'; updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); } if (log.isDebugEnabled()) @@ -574,7 +587,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (locPart != null) { GridDhtPartitionState state = locPart.state(); - if (state == MOVING && cctx.kernalContext().state().active()) { + if (state == MOVING && ctx.kernalContext().state().active()) { locPart.rent(false); updateSeq = updateLocal(p, locPart.state(), updateSeq); @@ -588,7 +601,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - updateRebalanceVersion(cctx.affinity().assignments(topVer)); + updateRebalanceVersion(grp.affinity().assignments(topVer)); consistencyCheck(); } @@ -622,11 +635,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - if (cctx.shared().pageStore() != null) { + if (ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -672,7 +685,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh state = loc != null ? loc.state() : null; - boolean belongs = cctx.affinity().partitionLocalNode(p, topVer); + boolean belongs = partitionLocalNode(p, topVer); if (loc != null && state == EVICTED) { locParts.set(p, loc = null); @@ -692,7 +705,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); if (updateSeq) this.updateSeq.incrementAndGet(); @@ -707,9 +720,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh lock.writeLock().unlock(); } - if (created && cctx.shared().pageStore() != null) { + if (created && ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + // TODO IGNITE-5075. + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -734,8 +748,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); + @Override public GridDhtLocalPartition localPartition(int part) { + return locParts.get(part); } /** {@inheritDoc} */ @@ -791,7 +805,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh map.put(i, part.state()); } - return new GridDhtPartitionMap(cctx.nodeId(), + return new GridDhtPartitionMap(ctx.localNodeId(), updateSeq.get(), topVer, Collections.unmodifiableMap(map), @@ -831,7 +845,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { - AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); + AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); List<ClusterNode> affNodes = affAssignment.get(p); @@ -854,8 +868,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + ", topVer2=" + this.topVer + - ", node=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + + ", node=" + ctx.igniteInstanceName() + + ", grp=" + grp.cacheOrGroupName() + ", node2part=" + node2part + ']'; List<ClusterNode> nodes = null; @@ -867,7 +881,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh HashSet<UUID> affIds = affAssignment.getIds(p); if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = ctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { @@ -900,7 +914,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; + Collection<UUID> allIds = topVer.topologyVersion() > 0 ? + F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) : + null; lock.readLock().lock(); @@ -908,7 +924,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + ", allIds=" + allIds + ", node2part=" + node2part + - ", cache=" + cctx.name() + ']'; + ", grp=" + grp.cacheOrGroupName() + ']'; Collection<UUID> nodeIds = part2node.get(p); @@ -925,7 +941,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = ctx.discovery().node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -941,7 +957,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, topVer); return nodes(p, topVer, OWNING); @@ -954,7 +970,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, AffinityTopologyVersion.NONE); return nodes(p, AffinityTopologyVersion.NONE, MOVING); @@ -979,12 +995,14 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh lock.readLock().lock(); try { - assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + if (node2part == null || stopping) + return null; + + assert node2part.valid() : "Invalid node2part [node2part=" + node2part + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNode().id() + + ", locName=" + ctx.igniteInstanceName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -1067,7 +1085,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh // then we keep the newer value. if (newPart != null && (newPart.updateSequence() < part.updateSequence() || - (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) + (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + @@ -1081,7 +1099,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); - if (!cctx.discovery().alive(nodeId)) { + if (!ctx.discovery().alive(nodeId)) { if (log.isDebugEnabled()) log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + partMap + ']'); @@ -1115,11 +1133,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh boolean changed = false; - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); - GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId()); + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - if (nodeMap != null && cctx.shared().database().persistenceEnabled()) { + if (nodeMap != null && ctx.database().persistenceEnabled()) { for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { int p = e.getKey(); GridDhtPartitionState state = e.getValue(); @@ -1188,7 +1206,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh long updateSeq = this.updateSeq.incrementAndGet(); if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1201,7 +1219,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh log.debug("Partition map after full update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1254,7 +1272,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - if (!cctx.discovery().alive(parts.nodeId())) { + if (!ctx.discovery().alive(parts.nodeId())) { if (log.isDebugEnabled()) log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); @@ -1334,10 +1352,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1350,7 +1368,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh log.debug("Partition map after single update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1364,7 +1382,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); Collection<Integer> lost = null; @@ -1398,7 +1416,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh boolean changed = false; if (lost != null) { - PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); + PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); assert plc != null; @@ -1430,13 +1448,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) - cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + grp.addRebalanceEvent(part, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } } if (plc != PartitionLossPolicy.IGNORE) - cctx.needsRecovery(true); + grp.needsRecovery(true); } return changed; @@ -1451,7 +1473,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); long updSeq = updateSeq.incrementAndGet(); for (int part = 0; part < parts; part++) { @@ -1490,9 +1512,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } - checkEvictions(updSeq, cctx.affinity().assignments(topVer)); + checkEvictions(updSeq, grp.affinity().assignments(topVer)); - cctx.needsRecovery(false); + grp.needsRecovery(false); } finally { lock.writeLock().unlock(); @@ -1506,7 +1528,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh try { Collection<Integer> res = null; - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); for (int part = 0; part < parts; part++) { Set<UUID> nodeIds = part2node.get(part); @@ -1544,7 +1566,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) { + if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { if (haveHistory) locPart.moving(); else { @@ -1552,7 +1574,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh locPart.reload(true); - result.add(cctx.localNodeId()); + result.add(ctx.localNodeId()); } } @@ -1588,12 +1610,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @return {@code True} if state changed. */ private boolean checkEvictions(long updateSeq) { - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); boolean changed = false; if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); + List<List<ClusterNode>> aff = grp.affinity().assignments(topVer); changed = checkEvictions(updateSeq, aff); @@ -1625,12 +1647,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @return Checks if any of the local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) { - if (!cctx.kernalContext().state().active()) + if (!ctx.kernalContext().state().active()) return false; boolean changed = false; - UUID locId = cctx.nodeId(); + UUID locId = ctx.localNodeId(); for (int p = 0; p < locParts.length(); p++) { GridDhtLocalPartition part = locParts.get(p); @@ -1643,7 +1665,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (state.active()) { List<ClusterNode> affNodes = aff.get(p); - if (!affNodes.contains(cctx.localNode())) { + if (!affNodes.contains(ctx.localNode())) { List<ClusterNode> nodes = nodes(p, topVer, OWNING); Collection<UUID> nodeIds = F.nodeIds(nodes); @@ -1710,10 +1732,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); // If this node became the oldest node. - if (cctx.localNode().equals(oldest)) { + if (ctx.localNode().equals(oldest)) { long seq = node2part.updateSequence(); if (seq != updateSeq) { @@ -1740,7 +1762,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } if (node2part != null) { - UUID locNodeId = cctx.localNodeId(); + UUID locNodeId = ctx.localNodeId(); GridDhtPartitionMap map = node2part.get(locNodeId); @@ -1777,9 +1799,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh ClusterNode oldest = discoCache.oldestAliveServerNode(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); if (node2part != null) { if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { @@ -1927,11 +1949,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNodeId() + + ", locName=" + ctx.igniteInstanceName() + ']'; for (GridDhtPartitionMap map : node2part.values()) { if (map.hasMovingPartitions()) @@ -1945,10 +1966,25 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } } + /** + * @param cacheId Cache ID. + */ + public void onCacheStopped(int cacheId) { + if (!grp.sharedGroup()) + return; + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part != null) + part.onCacheStopped(cacheId); + } + } + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() + + ", grp=" + grp.cacheOrGroupName() + ']'); lock.readLock().lock(); @@ -1959,7 +1995,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (part == null) continue; - int size = part.dataStore().size(); + int size = part.dataStore().fullSize(); if (size >= threshold) X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); @@ -1976,7 +2012,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * @return {@code True} if given partition belongs to local node. */ private boolean localNode(int part, List<List<ClusterNode>> aff) { - return aff.get(part).contains(cctx.localNode()); + return aff.get(part).contains(ctx.localNode()); } /** @@ -1987,7 +2023,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (node2part == null || !node2part.valid()) return; - for (int i = 0; i < cctx.affinity().partitions(); i++) { + for (int i = 0; i < grp.affinity().partitions(); i++) { List<ClusterNode> affNodes = aff.get(i); // Topology doesn't contain server nodes (just clients). @@ -2003,7 +2039,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh rebalancedTopVer = topVer; if (log.isDebugEnabled()) - log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); + log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']'); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index c91eb7a..d607ff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; @@ -118,51 +119,61 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - preldr = new GridDhtPreloader(ctx); - - preldr.start(); - - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { processNearSingleGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() { @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest>() { @Override public void apply(UUID nodeId, GridDhtLockRequest req) { processDhtLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse>() { @Override public void apply(UUID nodeId, GridDhtLockResponse req) { processDhtLockResponse(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest>() { @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { processNearUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest>() { @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { processDhtUnlockRequest(nodeId, req); } }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class, + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class, + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); } /** {@inheritDoc} */ @@ -382,7 +393,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : - ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); + ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion()); if (keyFut == null || keyFut.isDone()) { if (keyFut != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index d777a22..6d717eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -173,19 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { } switch (writer.state()) { - case 7: + case 6: if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("retVal", retVal)) return false; @@ -207,7 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { return false; switch (reader.state()) { - case 7: + case 6: checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes"); if (!reader.isLastRead()) @@ -215,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); - case 8: + case 7: miniId = reader.readInt("miniId"); if (!reader.isLastRead()) @@ -223,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); - case 9: + case 8: retVal = reader.readMessage("retVal"); if (!reader.isLastRead()) @@ -243,7 +243,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java index c483408..67eacd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java @@ -47,6 +47,16 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { // No-op. } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * * @param vers Near Tx xid Versions. @@ -87,7 +97,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG)) return false; @@ -109,7 +119,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: vers = reader.readCollection("vers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -129,6 +139,6 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 3; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e2b7803..bd238d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1086,7 +1086,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite Collection<KeyCacheObject> keys = entry.getValue(); - lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); + GridCacheContext ctx = cctx.cacheContext(cacheId); + + lastForceFut = ctx.group().preloader().request(ctx, keys, tx.topologyVersion()); if (compFut != null && lastForceFut != null) compFut.add(lastForceFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 6452abc..f29c507 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -458,7 +458,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda GridCacheVersion ver = null; if (readNoEntry) { - CacheDataRow row = cctx.offheap().read(key); + CacheDataRow row = cctx.offheap().read(cctx, key); if (row != null) { long expireTime = row.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index e5df64e..9811f8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -359,7 +359,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec boolean skipEntry = readNoEntry; if (readNoEntry) { - CacheDataRow row = cctx.offheap().read(key); + CacheDataRow row = cctx.offheap().read(cctx, key); if (row != null) { long expireTime = row.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index 579796d..d2dc817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable { /** Skip store flag bit mask. */ private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0dafa2b..2f99033 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -70,7 +70,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -193,19 +194,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtAtomicCacheEntry(ctx, topVer, key); - } - }; - } - - /** {@inheritDoc} */ @Override protected void init() { super.init(); @@ -239,11 +227,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { metrics = m; - preldr = new GridDhtPreloader(ctx); - - preldr.start(); - - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest>() { @@ -257,7 +241,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { @@ -271,7 +255,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearAtomicAbstractUpdateRequest.class, new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { @@ -290,7 +274,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() { @Override public void apply( @@ -308,7 +293,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicAbstractUpdateRequest.class, new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { @@ -327,7 +312,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() { @@ -346,7 +331,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridDhtAtomicDeferredUpdateResponse.class, new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { @Override public void apply( @@ -364,7 +350,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { @@ -377,7 +364,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), + ctx.io().addCacheHandler( + ctx.cacheId(), GridNearAtomicCheckUpdateRequest.class, new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { @@ -390,8 +378,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysRequest.class, + new MessageHandler<GridDhtForceKeysRequest>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addCacheHandler( + ctx.cacheId(), + GridDhtForceKeysResponse.class, + new MessageHandler<GridDhtForceKeysResponse>() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); + if (near == null) { - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @@ -405,7 +411,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler( + ctx.io().addCacheHandler( ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @@ -1486,7 +1492,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Optimistically expect that all keys are available locally (avoid creation of get future). for (KeyCacheObject key : keys) { if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(key); + CacheDataRow row = ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -1662,7 +1668,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final GridNearAtomicAbstractUpdateRequest req, final UpdateReplyClosure completionCb ) { - IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion()); + IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion()); if (forceFut == null || forceFut.isDone()) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java deleted file mode 100644 index b0c9a64..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * DHT atomic cache entry. - */ -public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry { - /** - * @param ctx Cache context. - * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). - * @param key Cache key. - */ - GridDhtAtomicCacheEntry( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - super(ctx, topVer, key); - } - - /** {@inheritDoc} */ - @Override protected String cacheName() { - return CU.isNearEnabled(cctx) ? super.cacheName() : cctx.dht().name(); - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridDhtAtomicCacheEntry.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 92ef149..0c069da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridLongList; @@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable; /** * Deferred dht atomic update response. */ -public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtAtomicDeferredUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java index d6e2db0..71d2321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -36,7 +36,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic /** * Message sent from DHT nodes to near node in FULL_SYNC mode. */ -public class GridDhtAtomicNearResponse extends GridCacheMessage { +public class GridDhtAtomicNearResponse extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 693d658..7b2547a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; @@ -27,7 +26,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -39,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * DHT atomic cache backup update response. */ -public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 4b3ea5bc..bb47af4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { +public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable { /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java index 4b9109e..96be023 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.nio.ByteBuffer; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ -public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage { +public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 55953ea..5ba024f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable; /** * DHT atomic cache near update response. */ -public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 12a3912..708df49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -116,35 +115,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { - return new GridCacheMapEntryFactory() { - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - return new GridDhtColocatedCacheEntry(ctx, topVer, key); - } - }; - } - - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processNearGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { processNearSingleGetResponse(nodeId, res); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() { @Override public void apply(UUID nodeId, GridNearLockResponse res) { processLockResponse(nodeId, res); } @@ -467,7 +453,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte for (KeyCacheObject key : keys) { if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(key); + CacheDataRow row = ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime(); @@ -941,7 +927,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ) { assert keys != null; - IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer); + IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer); // Prevent embedded future creation if possible. if (keyFut == null || keyFut.isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java deleted file mode 100644 index f7cc5a7..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; - -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Cache entry for colocated cache. - */ -public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry { - /** - * @param ctx Cache context. - * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). - * @param key Cache key. - */ - GridDhtColocatedCacheEntry( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key - ) { - super(ctx, topVer, key); - } - - /** {@inheritDoc} */ - @Override protected String cacheName() { - return cctx.colocated().name(); - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(GridDhtColocatedCacheEntry.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index d72d8db..763b43b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec /** Future ID. */ private IgniteUuid futId = IgniteUuid.randomUuid(); - /** Preloader. */ - private GridDhtPreloader preloader; - /** Trackable flag. */ private boolean trackable; @@ -112,21 +109,19 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param cctx Cache context. * @param topVer Topology version. * @param keys Keys. - * @param preloader Preloader. */ public GridDhtForceKeysFuture( GridCacheContext<K, V> cctx, AffinityTopologyVersion topVer, - Collection<KeyCacheObject> keys, - GridDhtPreloader preloader + Collection<KeyCacheObject> keys ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; + assert !cctx.isNear(); this.cctx = cctx; this.keys = keys; this.topVer = topVer; - this.preloader = preloader; top = cctx.dht().topology(); @@ -158,7 +153,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) { if (super.onDone(res, err)) { if (trackable) - preloader.remoteFuture(this); + cctx.dht().removeFuture(this); return true; } @@ -170,7 +165,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec * @param evt Discovery event. */ @SuppressWarnings( {"unchecked"}) - void onDiscoveryEvent(DiscoveryEvent evt) { + public void onDiscoveryEvent(DiscoveryEvent evt) { topCntr.incrementAndGet(); int type = evt.type(); @@ -244,7 +239,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int curTopVer = topCntr.get(); - if (!preloader.addFuture(this)) { + if (!cctx.dht().addFuture(this)) { assert isDone() : this; return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java index d129ae8..124ae44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -40,7 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; * Force keys request. This message is sent by node while preloading to force * another node to put given keys into the next batch of transmitting entries. */ -public class GridDhtForceKeysRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L;
