http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d3042e9..88dc863 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -57,6 +57,8 @@ import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDisc import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; @@ -198,8 +200,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private CacheAffinityChangeMessage affChangeMsg; - /** Cache validation results. */ - private volatile Map<Integer, CacheValidation> cacheValidRes; + /** Cache groups validation results. */ + private volatile Map<Integer, CacheValidation> grpValidRes; /** Skip preload flag. */ private boolean skipPreload; @@ -237,6 +239,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** */ private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); + /** */ private final AtomicBoolean done = new AtomicBoolean(); /** @@ -404,8 +407,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @param cacheId Cache ID to check. - * @param rcvdFrom Topology version. + * @param cacheId Cache ID. + * @param rcvdFrom Node ID cache was received from. * @return {@code True} if cache was added during this exchange. */ public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) { @@ -413,14 +416,32 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param grpId Cache group ID. + * @param rcvdFrom Node ID cache group was received from. + * @return {@code True} if cache group was added during this exchange. + */ + public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) { + return dynamicCacheGroupStarted(grpId) || + (exchId.isJoined() && exchId.nodeId().equals(rcvdFrom)); + } + + /** * @param cacheId Cache ID. * @return {@code True} if non-client cache was added during this exchange. */ - public boolean dynamicCacheStarted(int cacheId) { + private boolean dynamicCacheStarted(int cacheId) { return exchActions != null && exchActions.cacheStarted(cacheId); } /** + * @param grpId Cache group ID. + * @return {@code True} if non-client cache group was added during this exchange. + */ + public boolean dynamicCacheGroupStarted(int grpId) { + return exchActions != null && exchActions.cacheGroupStarting(grpId); + } + + /** * @return {@code True} */ public boolean onAdded() { @@ -550,8 +571,24 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchActions = new ExchangeActions(); + Set<String> cacheNames = new HashSet<>(); + + Collection<CacheGroupDescriptor> grpDescs = new ArrayList<>(); + + for (Integer grpId : op.cacheGroupIds()) { + CacheGroupContext cacheGrp = cctx.cache().cacheGroup(grpId); + + if (cacheGrp == null) + continue; + + grpDescs.add(cctx.cache().cacheGroupDescriptors().get(grpId)); + + for (Integer cacheId : cacheGrp.cacheIds()) + cacheNames.add(cctx.cacheContext(cacheId).name()); + } + List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests( - cctx.cache(), op.cacheNames(), cctx.localNodeId()); + cctx.cache(), cacheNames, cctx.localNodeId()); if (!F.isEmpty(destroyRequests)) { //Emulate destroy cache request for (DynamicCacheChangeRequest req : destroyRequests) { @@ -559,6 +596,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT .cacheDescriptor(CU.cacheId(req.cacheName()))); } + for (CacheGroupDescriptor grpDesc : grpDescs) + exchActions.addCacheGroupToStop(grpDesc); + if (op.type() == SnapshotOperationType.RESTORE) cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(destroyRequests), topVer); @@ -688,11 +728,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT try { if (crd != null) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.topology().beforeExchange(this, !centralizedAff); + grp.topology().beforeExchange(this, !centralizedAff); } } } @@ -706,28 +746,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @throws IgniteCheckedException If failed. */ private void updateTopologies(boolean crd) throws IgniteCheckedException { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(cacheCtx.cacheId()); + GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId()); long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence(); - GridDhtPartitionTopology top = cacheCtx.topology(); + GridDhtPartitionTopology top = grp.topology(); if (crd) { - boolean updateTop = exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); + boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop && clientTop != null) top.update(this, clientTop.partitionMap(true), clientTop.updateCounters(false), Collections.<Integer>emptySet()); } - top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); + top.updateTopologyVersion(exchId, this, updSeq, cacheGroupStopping(grp.groupId())); } for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) - top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId())); + top.updateTopologyVersion(exchId, this, -1, cacheGroupStopping(top.groupId())); } /** @@ -821,15 +861,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (crd != null) { if (crd.isLocal()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - boolean updateTop = !cacheCtx.isLocal() && - exchId.topologyVersion().equals(cacheCtx.startTopologyVersion()); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + boolean updateTop = !grp.isLocal() && + exchId.topologyVersion().equals(grp.localStartVersion()); if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(this, - top.partitionMap(true), + if (top.groupId() == grp.groupId()) { + GridDhtPartitionFullMap fullMap = top.partitionMap(true); + + assert fullMap != null; + + grp.topology().update(this, + fullMap, top.updateCounters(false), Collections.<Integer>emptySet()); @@ -850,8 +894,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } else { if (centralizedAff) { // Last server node failed. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + GridAffinityAssignmentCache aff = grp.affinity(); aff.initialize(topologyVersion(), aff.idealAssignment()); } @@ -869,11 +913,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert !cctx.kernalContext().clientNode(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.preloader().onTopologyChanged(this); + grp.preloader().onTopologyChanged(this); } cctx.database().releaseHistoryForPreloading(); @@ -886,15 +930,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId())) + if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId())) continue; if (topChanged) { // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); } + } + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; - cacheCtx.topology().beforeExchange(this, !centralizedAff); + grp.topology().beforeExchange(this, !centralizedAff); } cctx.database().beforeExchange(this); @@ -1018,11 +1067,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * */ private void onLeft() { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - cacheCtx.preloader().unwindUndeploys(); + grp.preloader().unwindUndeploys(); } cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); @@ -1034,17 +1083,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private void warnNoAffinityNodes() { List<String> cachesWithoutNodes = null; - for (String name : cctx.cache().cacheNames()) { - if (discoCache.cacheAffinityNodes(name).isEmpty()) { + for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values()) { + if (discoCache.cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); - cachesWithoutNodes.add(name); + cachesWithoutNodes.add(cacheDesc.cacheName()); // Fire event even if there is no client cache started. if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) { Event evt = new CacheEvent( - name, + cacheDesc.cacheName(), cctx.localNode(), cctx.localNode(), "All server nodes have left the cluster.", @@ -1106,10 +1155,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** + * @param grpId Cache group ID to check. + * @return {@code True} if cache group us stopping by this exchange. + */ + private boolean cacheGroupStopping(int grpId) { + return exchActions != null && exchActions.cacheGroupStopping(grpId); + } + + /** * @param cacheId Cache ID to check. * @return {@code True} if cache is stopping by this exchange. */ - public boolean stopping(int cacheId) { + private boolean cacheStopping(int cacheId) { return exchActions != null && exchActions.cacheStopped(cacheId); } @@ -1235,18 +1292,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (err == null && realExchange) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; try { if (centralizedAff) - cacheCtx.topology().initPartitions(this); + grp.topology().initPartitions(this); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to initialize partitions.", e); } + } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx; if (drCacheCtx.isDrEnabled()) { @@ -1264,21 +1323,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT discoEvt.type() == EVT_NODE_JOINED) detectLostPartitions(); - Map<Integer, CacheValidation> m = new HashMap<>(cctx.cacheContexts().size()); + Map<Integer, CacheValidation> m = new HashMap<>(cctx.cache().cacheGroups().size()); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - Collection<Integer> lostParts = cacheCtx.isLocal() ? - Collections.<Integer>emptyList() : cacheCtx.topology().lostPartitions(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + Collection<Integer> lostParts = grp.isLocal() ? + Collections.<Integer>emptyList() : grp.topology().lostPartitions(); boolean valid = true; - if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) - valid = cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()); + if (grp.topologyValidator() != null && !grp.systemCache()) + valid = grp.topologyValidator().validate(discoEvt.topologyNodes()); - m.put(cacheCtx.cacheId(), new CacheValidation(valid, lostParts)); + m.put(grp.groupId(), new CacheValidation(valid, lostParts)); } - cacheValidRes = m; + grpValidRes = m; } cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err, false); @@ -1329,8 +1388,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT initFut.onDone(err == null); if (exchId.isLeft()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.affinityFunction().removeNode(exchId.nodeId()); } exchActions = null; @@ -1379,16 +1438,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return new CacheInvalidStateException( "Failed to perform cache operation (cluster is not activated): " + cctx.name()); - PartitionLossPolicy partLossPlc = cctx.config().getPartitionLossPolicy(); + CacheGroupContext grp = cctx.group(); - if (cctx.needsRecovery() && !recovery) { + PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); + + if (grp.needsRecovery() && !recovery) { if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL)) return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " + cctx.name()); } - if (cctx.needsRecovery() || cctx.config().getTopologyValidator() != null) { - CacheValidation validation = cacheValidRes.get(cctx.cacheId()); + if (grp.needsRecovery() || grp.topologyValidator() != null) { + CacheValidation validation = grpValidRes.get(grp.groupId()); if (validation == null) return null; @@ -1397,7 +1458,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return new IgniteCheckedException("Failed to perform cache operation " + "(cache topology is not valid): " + cctx.name()); - if (recovery || !cctx.needsRecovery()) + if (recovery || !grp.needsRecovery()) return null; if (key != null) { @@ -1629,9 +1690,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Long> minCntrs = new HashMap<>(); for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { - assert e.getValue().partitionUpdateCounters(top.cacheId()) != null; + assert e.getValue().partitionUpdateCounters(top.groupId()) != null; - for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.cacheId()).entrySet()) { + for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) { int p = e0.getKey(); UUID uuid = e.getKey(); @@ -1702,7 +1763,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; - Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.cacheId()) : null; + Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null; Set<Integer> haveHistory = new HashSet<>(); @@ -1723,7 +1784,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (localCntr != null && localCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) { - partHistSuppliers.put(cctx.localNodeId(), top.cacheId(), p, minCntr); + partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, minCntr); haveHistory.add(p); @@ -1732,10 +1793,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) { - Long histCntr = e0.getValue().partitionHistoryCounters(top.cacheId()).get(p); + Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p); if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) { - partHistSuppliers.put(e0.getKey(), top.cacheId(), p, minCntr); + partHistSuppliers.put(e0.getKey(), top.groupId(), p, minCntr); haveHistory.add(p); @@ -1756,7 +1817,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT Set<UUID> nodesToReload = top.setOwners(p, e.getValue().nodes, haveHistory.contains(p), entryLeft == 0); for (UUID nodeId : nodesToReload) - partsToReload.put(nodeId, top.cacheId(), p); + partsToReload.put(nodeId, top.groupId(), p); } } @@ -1768,24 +1829,32 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (Thread.currentThread().isInterrupted()) return; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().detectLostPartitions(discoEvt); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) + grp.topology().detectLostPartitions(discoEvt); } } } /** - * + * @param cacheNames Cache names. */ private void resetLostPartitions(Collection<String> cacheNames) { synchronized (cctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) return; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheNames.contains(cacheCtx.name())) - cacheCtx.topology().resetLostPartitions(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; + + for (String cacheName : cacheNames) { + if (grp.hasCache(cacheName)) { + grp.topology().resetLostPartitions(); + + break; + } + } } } } @@ -1800,9 +1869,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT assert partHistSuppliers.isEmpty(); if (!crd.equals(discoCache.serverNodes().get(0))) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) - cacheCtx.topology().beforeExchange(this, !centralizedAff); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) + grp.topology().beforeExchange(this, !centralizedAff); } } @@ -1811,13 +1880,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg; for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) { - Integer cacheId = entry.getKey(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + Integer grpId = entry.getKey(); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : - cctx.exchange().clientTopology(cacheId, this); + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId, this); - Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId); + Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId); if (cntrs != null) top.applyUpdateCounters(cntrs); @@ -1895,11 +1964,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT */ private void assignPartitionsStates() { if (cctx.database().persistenceEnabled()) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - assignPartitionStates(cacheCtx.topology()); + assignPartitionStates(grp.topology()); } } } @@ -2028,20 +2097,20 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT partHistSuppliers.putAll(msg.partitionHistorySuppliers()); for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(cacheId); + Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx != null) - cacheCtx.topology().update(this, entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), cacheId)); + if (grp != null) + grp.topology().update(this, entry.getValue(), cntrMap, + msg.partsToReload(cctx.localNodeId(), grpId)); else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); + cctx.exchange().clientTopology(grpId, this).update(this, entry.getValue(), cntrMap, Collections.<Integer>emptySet()); } } } @@ -2055,11 +2124,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT msgs.put(node.id(), msg); for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + Integer grpId = entry.getKey(); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : - cctx.exchange().clientTopology(cacheId, this); + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId, this); top.update(exchId, entry.getValue()); } @@ -2211,13 +2280,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT List<ClusterNode> empty = Collections.emptyList(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - List<List<ClusterNode>> affAssignment = new ArrayList<>(cacheCtx.affinity().partitions()); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + List<List<ClusterNode>> affAssignment = new ArrayList<>(grp.affinity().partitions()); - for (int i = 0; i < cacheCtx.affinity().partitions(); i++) + for (int i = 0; i < grp.affinity().partitions(); i++) affAssignment.add(empty); - cacheCtx.affinity().affinityCache().initialize(topologyVersion(), affAssignment); + grp.affinity().initialize(topologyVersion(), affAssignment); } onDone(topologyVersion());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 94ad21e..2f8a531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -125,6 +125,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa this.partsToReload = partsToReload; } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + /** * @param compress {@code True} if it is possible to use compression for message. */ @@ -143,24 +148,26 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return {@code True} if message contains full map for given cache. */ - public boolean containsCache(int cacheId) { - return parts != null && parts.containsKey(cacheId); + public boolean containsGroup(int grpId) { + return parts != null && parts.containsKey(grpId); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param fullMap Full partitions map. * @param dupDataCache Optional ID of cache with the same partition state map. */ - public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { + public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { + assert fullMap != null; + if (parts == null) parts = new HashMap<>(); - if (!parts.containsKey(cacheId)) { - parts.put(cacheId, fullMap); + if (!parts.containsKey(grpId)) { + parts.put(grpId, fullMap); if (dupDataCache != null) { assert compress; @@ -169,30 +176,29 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (dupPartsData == null) dupPartsData = new HashMap<>(); - dupPartsData.put(cacheId, dupDataCache); + dupPartsData.put(grpId, dupDataCache); } } } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) partCntrs = new IgniteDhtPartitionCountersMap(); - partCntrs.putIfAbsent(cacheId, cntrMap); + partCntrs.putIfAbsent(grpId, cntrMap); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { - if (partCntrs != null) { - return partCntrs.get(cacheId); - } + @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { + if (partCntrs != null) + return partCntrs.get(grpId); return Collections.emptyMap(); } @@ -207,11 +213,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return partHistSuppliers; } - public Set<Integer> partsToReload(UUID nodeId, int cacheId) { + public Set<Integer> partsToReload(UUID nodeId, int grpId) { if (partsToReload == null) return Collections.emptySet(); - return partsToReload.get(nodeId, cacheId); + return partsToReload.get(nodeId, grpId); } /** @@ -396,43 +402,43 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } switch (writer.state()) { - case 6: + case 5: if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeByteArray("exsBytes", exsBytes)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); - case 11: + case 10: if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) return false; writer.incrementState(); - case 12: + case 11: if (!writer.writeMessage("topVer", topVer)) return false; @@ -454,7 +460,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return false; switch (reader.state()) { - case 6: + case 5: dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) @@ -462,7 +468,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 7: + case 6: exsBytes = reader.readByteArray("exsBytes"); if (!reader.isLastRead()) @@ -470,7 +476,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 8: + case 7: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -478,7 +484,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 9: + case 8: partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); if (!reader.isLastRead()) @@ -486,7 +492,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 10: + case 9: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -494,7 +500,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 11: + case 10: partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); if (!reader.isLastRead()) @@ -502,7 +508,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); - case 12: + case 11: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -520,11 +526,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return 46; } - //todo - /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9222251..1e5ea14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -111,6 +111,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes this.compress = compress; } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + /** * @return {@code True} if sent from client node. */ @@ -142,23 +147,23 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition update counters. */ - public void partitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) { + public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) { if (partCntrs == null) partCntrs = new HashMap<>(); - partCntrs.put(cacheId, cntrMap); + partCntrs.put(grpId, cntrMap); } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition update counters. */ - @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { + @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) { if (partCntrs != null) { - Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId); + Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId); return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap(); } @@ -167,35 +172,34 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param cntrMap Partition history counters. */ - public void partitionHistoryCounters(int cacheId, Map<Integer, Long> cntrMap) { + public void partitionHistoryCounters(int grpId, Map<Integer, Long> cntrMap) { if (cntrMap.isEmpty()) return; if (partHistCntrs == null) partHistCntrs = new HashMap<>(); - partHistCntrs.put(cacheId, cntrMap); + partHistCntrs.put(grpId, cntrMap); } /** * @param cntrMap Partition history counters. */ - public void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) { - for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) { + void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap) { + for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) partitionHistoryCounters(e.getKey(), e.getValue()); - } } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Partition history counters. */ - public Map<Integer, Long> partitionHistoryCounters(int cacheId) { + Map<Integer, Long> partitionHistoryCounters(int grpId) { if (partHistCntrs != null) { - Map<Integer, Long> res = partHistCntrs.get(cacheId); + Map<Integer, Long> res = partHistCntrs.get(grpId); return res != null ? res : Collections.<Integer, Long>emptyMap(); } @@ -351,37 +355,37 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } switch (writer.state()) { - case 6: + case 5: if (!writer.writeBoolean("client", client)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeByteArray("exBytes", exBytes)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) return false; writer.incrementState(); - case 11: + case 10: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -403,7 +407,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes return false; switch (reader.state()) { - case 6: + case 5: client = reader.readBoolean("client"); if (!reader.isLastRead()) @@ -411,7 +415,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 7: + case 6: dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) @@ -419,7 +423,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 8: + case 7: exBytes = reader.readByteArray("exBytes"); if (!reader.isLastRead()) @@ -427,7 +431,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 9: + case 8: partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) @@ -435,7 +439,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 10: + case 9: partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); if (!reader.isLastRead()) @@ -443,7 +447,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); - case 11: + case 10: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -461,11 +465,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes return 47; } - //todo add ex - /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 11; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java index b4ce939..4b80ee0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java @@ -48,6 +48,11 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes } /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) { return Collections.emptyMap(); } @@ -89,7 +94,7 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 5; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 957f93b0..82eafc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -23,30 +23,22 @@ import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; @@ -61,11 +53,8 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; @@ -77,7 +66,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; -import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; /** * DHT cache preloader. @@ -89,9 +77,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** */ private GridDhtPartitionTopology top; - /** Force key futures. */ - private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); - /** Partition suppliers. */ private GridDhtPartitionSupplier supplier; @@ -114,47 +99,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final AtomicInteger partsEvictOwning = new AtomicInteger(); /** */ - private volatile boolean stopping; - - /** */ private boolean stopped; - /** Discovery listener. */ - private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - if (!enterBusy()) - return; - - DiscoveryEvent e = (DiscoveryEvent)evt; - - try { - ClusterNode loc = cctx.localNode(); - - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED; - - final ClusterNode n = e.eventNode(); - - assert !loc.id().equals(n.id()); - - for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) - f.onDiscoveryEvent(e); - - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - } - finally { - leaveBusy(); - } - } - }; - /** - * @param cctx Cache context. + * @param grp Cache group. */ - public GridDhtPreloader(GridCacheContext<?, ?> cctx) { - super(cctx); + public GridDhtPreloader(CacheGroupContext grp) { + super(grp); - top = cctx.dht().topology(); + top = grp.topology(); startFut = new GridFutureAdapter<>(); } @@ -164,37 +117,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("Starting DHT rebalancer..."); - cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysRequest.class, - new MessageHandler<GridDhtForceKeysRequest>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { - processForceKeysRequest(node, msg); - } - }); - - cctx.io().addHandler(cctx.cacheId(), GridDhtForceKeysResponse.class, - new MessageHandler<GridDhtForceKeysResponse>() { - @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { - processForceKeyResponse(node, msg); - } - }); - - if (!cctx.kernalContext().clientNode()) { - cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class, - new MessageHandler<GridDhtAffinityAssignmentRequest>() { - @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) { - processAffinityAssignmentRequest(node, msg); - } - }); - } - - cctx.shared().affinity().onCacheCreated(cctx); - - supplier = new GridDhtPartitionSupplier(cctx); - demander = new GridDhtPartitionDemander(cctx); + supplier = new GridDhtPartitionSupplier(grp); + demander = new GridDhtPartitionDemander(grp); demander.start(); - - cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } /** {@inheritDoc} */ @@ -213,10 +139,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("DHT rebalancer onKernalStop callback."); - stopping = true; - - cctx.events().removeListener(discoLsnr); - // Acquire write busy lock. busyLock.writeLock().lock(); @@ -227,11 +149,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (demander != null) demander.stop(); - IgniteCheckedException err = stopError(); - - for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) - fut.onDone(err); - top = null; stopped = true; @@ -266,25 +183,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. - GridDhtPartitionTopology top = cctx.dht().topology(); + GridDhtPartitionTopology top = grp.topology(); - if (!cctx.rebalanceEnabled() || !cctx.shared().kernalContext().state().active()) + if (!grp.rebalanceEnabled() || !grp.shared().kernalContext().state().active()) return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - int partCnt = cctx.affinity().partitions(); + int partCnt = grp.affinity().partitions(); assert exchFut.forcePreload() || exchFut.dummyReassign() || exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", cache=" + cctx.name() + + ", grp=" + grp.name() + ", topVer=" + top.topologyVersion() + ']'; GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); + AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + for (int p = 0; p < partCnt; p++) { - if (cctx.shared().exchange().hasPendingExchange()) { + if (ctx.exchange().hasPendingExchange()) { if (log.isDebugEnabled()) log.debug("Skipping assignments creation, exchange worker has pending assignments: " + exchFut.exchangeId()); @@ -295,7 +214,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } // If partition belongs to local node. - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (aff.get(p).contains(ctx.localNode())) { GridDhtLocalPartition part = top.localPartition(p, topVer, true, true); assert part != null; @@ -303,11 +222,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ClusterNode histSupplier = null; - if (cctx.shared().database().persistenceEnabled()) { - UUID nodeId = exchFut.partitionHistorySupplier(cctx.cacheId(), p); + if (ctx.database().persistenceEnabled()) { + UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p); if (nodeId != null) - histSupplier = cctx.discovery().node(nodeId); + histSupplier = ctx.discovery().node(nodeId); } if (histSupplier != null) { @@ -318,7 +237,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { continue; // For. } - assert cctx.shared().database().persistenceEnabled(); + assert ctx.database().persistenceEnabled(); assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer); GridDhtPartitionDemandMessage msg = assigns.get(histSupplier); @@ -327,13 +246,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), exchFut.exchangeId().topologyVersion(), - cctx.cacheId())); + grp.groupId())); } msg.addPartition(p, true); } else { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { if (part.state() == RENTING || part.state() == EVICTED) { try { part.rent(false).get(); @@ -360,13 +279,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (picked.isEmpty()) { top.own(part); - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); - } + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); + } if (log.isDebugEnabled()) log.debug("Owning partition as there are no other owners: " + part); @@ -376,12 +295,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { GridDhtPartitionDemandMessage msg = assigns.get(n); - if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage( - top.updateSequence(), - exchFut.exchangeId().topologyVersion(), - cctx.cacheId())); - } + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + grp.groupId())); + } msg.addPartition(p, false); } @@ -403,7 +322,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Picked owners. */ private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { - Collection<ClusterNode> affNodes = cctx.affinity().nodesByPartition(p, topVer); + Collection<ClusterNode> affNodes = grp.affinity().cachedAffinity(topVer).get(p); int affCnt = affNodes.size(); @@ -429,7 +348,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Nodes owning this partition. */ private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { - return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); + return F.view(grp.topology().owners(p, topVer), F.remoteNodes(ctx.localNodeId())); } /** {@inheritDoc} */ @@ -468,7 +387,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forceRebalance, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut) { @@ -484,12 +402,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); + return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> rebalanceFuture() { - return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); + return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture(); } /** @@ -516,171 +434,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** - * @param node Node originated request. - * @param msg Force keys message. - */ - private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { - IgniteInternalFuture<?> fut = cctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); - - if (fut.isDone()) - processForceKeysRequest0(node, msg); - else - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - processForceKeysRequest0(node, msg); - } - }); - } - - /** - * @param node Node originated request. - * @param msg Force keys message. - */ - private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { - if (!enterBusy()) - return; - - try { - ClusterNode loc = cctx.localNode(); - - GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( - cctx.cacheId(), - msg.futureId(), - msg.miniId(), - cctx.deploymentEnabled()); - - for (KeyCacheObject k : msg.keys()) { - int p = cctx.affinity().partition(k); - - GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); - - // If this node is no longer an owner. - if (locPart == null && !top.owners(p).contains(loc)) { - res.addMissed(k); - - continue; - } - - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = cctx.dht().entryEx(k); - - entry.unswap(); - - GridCacheEntryInfo info = entry.info(); - - if (info == null) { - assert entry.obsolete() : entry; - - continue; - } - - if (!info.isNew()) - res.addInfo(info); - - cctx.evicts().touch(entry, msg.topologyVersion()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry: " + k); - } - catch (GridDhtInvalidPartitionException ignore) { - if (log.isDebugEnabled()) - log.debug("Local node is no longer an owner: " + p); - - res.addMissed(k); - - break; - } - } - } - - if (log.isDebugEnabled()) - log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); - - cctx.io().send(node, res, cctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + - ", req=" + msg + ']'); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e); - } - finally { - leaveBusy(); - } - } - - /** - * @param node Node. - * @param msg Message. - */ - private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { - if (!enterBusy()) - return; - - try { - GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); - - if (f != null) - f.onResult(msg); - else if (log.isDebugEnabled()) - log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + - ", res=" + msg + ']'); - } - finally { - leaveBusy(); - } - } - - /** - * @param node Node. - * @param req Request. - */ - private void processAffinityAssignmentRequest(final ClusterNode node, - final GridDhtAffinityAssignmentRequest req) { - final AffinityTopologyVersion topVer = req.topologyVersion(); - - if (log.isDebugEnabled()) - log.debug("Processing affinity assignment request [node=" + node + ", req=" + req + ']'); - - cctx.affinity().affinityReadyFuture(req.topologyVersion()).listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (log.isDebugEnabled()) - log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer + - ", node=" + node + ']'); - - AffinityAssignment assignment = cctx.affinity().assignment(topVer); - - GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse( - req.futureId(), - cctx.cacheId(), - topVer, - assignment.assignment()); - - if (cctx.affinity().affinityCache().centralizedAffinityFunction()) { - assert assignment.idealAssignment() != null; - - res.idealAffinityAssignment(assignment.idealAssignment()); - } - - try { - cctx.io().send(node, res, AFFINITY_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e); - } - } - }); - } - - /** * Resends partitions on partition evict within configured timeout. * * @param part Evicted partition. @@ -693,11 +446,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { try { top.onEvicted(part, updateSeq); - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) - cctx.events().addUnloadEvent(part.id()); + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) + grp.addUnloadEvent(part.id()); if (updateSeq) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); } finally { leaveBusy(); @@ -706,7 +459,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public boolean needForceKeys() { - if (cctx.rebalanceEnabled()) { + if (grp.rebalanceEnabled()) { IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture(); if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result())) @@ -717,12 +470,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req, + @Override public GridDhtFuture<Object> request(GridCacheContext cctx, + GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) { if (!needForceKeys()) return null; - return request0(req.keys(), topVer); + return request0(cctx, req.keys(), topVer); } /** @@ -730,21 +484,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * @return Future for request. */ @SuppressWarnings({"unchecked", "RedundantCast"}) - @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + @Override public GridDhtFuture<Object> request(GridCacheContext cctx, + Collection<KeyCacheObject> keys, + AffinityTopologyVersion topVer) { if (!needForceKeys()) return null; - return request0(keys, topVer); + return request0(cctx, keys, topVer); } /** + * @param cctx Cache context. * @param keys Keys to request. * @param topVer Topology version. * @return Future for request. */ @SuppressWarnings({"unchecked", "RedundantCast"}) - private GridDhtFuture<Object> request0(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); + private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { + if (cctx.isNear()) + cctx = cctx.near().dht().context(); + + final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys); IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -754,7 +514,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (topReadyFut == null) startFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> syncFut) { - cctx.kernalContext().closure().runLocalSafe( + ctx.kernalContext().closure().runLocalSafe( new GridPlainRunnable() { @Override public void run() { fut.init(); @@ -791,46 +551,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { demandLock.writeLock().lock(); try { - cctx.deploy().unwind(cctx); + grp.unwindUndeploys(); } finally { demandLock.writeLock().unlock(); } } - /** - * Adds future to future map. - * - * @param fut Future to add. - * @return {@code False} if node cache is stopping and future was completed with error. - */ - boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { - forceKeyFuts.put(fut.futureId(), fut); - - if (stopping) { - fut.onDone(stopError()); - - return false; - } - - return true; - } - - /** - * Removes future from future map. - * - * @param fut Future to remove. - */ - void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) { - forceKeyFuts.remove(fut.futureId(), fut); - } - /** {@inheritDoc} */ @Override public void evictPartitionAsync(GridDhtLocalPartition part) { partsToEvict.putIfAbsent(part.id(), part); if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) { - cctx.closures().callLocalSafe(new GPC<Boolean>() { + ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { @Override public Boolean call() { boolean locked = true; @@ -851,7 +584,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { partsToEvict.put(part.id(), part); } catch (Throwable ex) { - if (cctx.kernalContext().isStopping()) { + if (ctx.kernalContext().isStopping()) { LT.warn(log, ex, "Partition eviction failed (current node is stopping).", false, true); @@ -886,44 +619,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void dumpDebugInfo() { - if (!forceKeyFuts.isEmpty()) { - U.warn(log, "Pending force key futures [cache=" + cctx.name() + "]:"); - - for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) - U.warn(log, ">>> " + fut); - } - supplier.dumpDebugInfo(); } - - /** - * - */ - private abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void apply(UUID nodeId, M msg) { - ClusterNode node = cctx.node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']'); - - return; - } - - if (log.isDebugEnabled()) - log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']'); - - onMessage(node, msg); - } - - /** - * @param node Node. - * @param msg Message. - */ - protected abstract void onMessage(ClusterNode node, M msg); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 2d5c8a5..e6c5c10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -103,7 +103,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); - ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { processGetResponse(nodeId, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index f8240f3..5b53935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -86,11 +86,23 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param ctx Context. */ protected GridNearCacheAdapter(GridCacheContext<K, V> ctx) { - super(ctx, ctx.config().getNearConfiguration().getNearStartSize()); + super(ctx); } /** {@inheritDoc} */ - @Override protected GridCacheMapEntryFactory entryFactory() { + @Override public void start() throws IgniteCheckedException { + if (map == null) { + map = new GridCacheLocalConcurrentMap( + ctx, + entryFactory(), + ctx.config().getNearConfiguration().getNearStartSize()); + } + } + + /** + * @return Entry factory. + */ + private GridCacheMapEntryFactory entryFactory() { return new GridCacheMapEntryFactory() { @Override public GridCacheMapEntry create( GridCacheContext ctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 757bfbd..6092511 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -32,7 +32,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -50,7 +50,7 @@ import org.jetbrains.annotations.NotNull; /** * Get request. */ -public class GridNearGetRequest extends GridCacheMessage implements GridCacheDeployable, +public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheDeployable, GridCacheVersionable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 50af754..b4e4424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; @@ -45,7 +45,7 @@ import org.jetbrains.annotations.NotNull; /** * Get response. */ -public class GridNearGetResponse extends GridCacheMessage implements GridCacheDeployable, +public class GridNearGetResponse extends GridCacheIdMessage implements GridCacheDeployable, GridCacheVersionable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index f4ce1ac..edddf7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; @@ -159,12 +160,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (e instanceof IgniteTxRollbackCheckedException) { if (marked) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to automatically rollback transaction: " + tx, ex); - } + tx.rollbackAsync().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to automatically rollback transaction: " + tx, e); + } + + onComplete(); + } + }); + + return; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index 0faa8ec..00ff4bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -23,7 +23,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.typedef.internal.S; @@ -35,7 +35,7 @@ import org.jetbrains.annotations.NotNull; /** * */ -public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridNearSingleGetRequest extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java index 554fca1..2cb75c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -37,7 +37,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -public class GridNearSingleGetResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridNearSingleGetResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -115,7 +115,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC * @return Topology version. */ @Override public AffinityTopologyVersion topologyVersion() { - return topVer; + return topVer != null ? topVer : super.topologyVersion(); } /**
