http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index bb31645..a251047 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -110,16 +110,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Number of retries using to send messages. */ private int retryCnt; - /** Indexed class handlers. */ - private volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + /** */ + private final MessageHandlers cacheHandlers = new MessageHandlers(); - /** Handler registry. */ - private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> - clsHandlers = new ConcurrentHashMap8<>(); - - /** Ordered handler registry. */ - private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = - new ConcurrentHashMap8<>(); + /** */ + private final MessageHandlers grpHandlers = new MessageHandlers(); /** Stopping flag. */ private boolean stopping; @@ -144,17 +139,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (cacheMsg.partitionExchangeMessage()) { if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) { + GridDhtAffinityAssignmentRequest msg0 = (GridDhtAffinityAssignmentRequest)cacheMsg; + assert cacheMsg.topologyVersion() != null : cacheMsg; AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order()); - DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); + CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptors().get(msg0.groupId()); - if (cacheDesc != null) { - if (cacheDesc.startTopologyVersion() != null) - startTopVer = cacheDesc.startTopologyVersion(); - else if (cacheDesc.receivedFromStartVersion() != null) - startTopVer = cacheDesc.receivedFromStartVersion(); + if (desc != null) { + if (desc.startTopologyVersion() != null) + startTopVer = desc.startTopologyVersion(); + else if (desc.receivedFromStartVersion() != null) + startTopVer = desc.receivedFromStartVersion(); } // Need to wait for exchange to avoid race between cache start and affinity request. @@ -165,7 +162,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + ", waitVer=" + startTopVer + - ", cacheDesc=" + cacheDesc + ']'); + ", cacheDesc=" + descriptorForMessage(cacheMsg) + ']'); } fut.listen(new CI1<IgniteInternalFuture<?>>() { @@ -260,21 +257,31 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) { + handleMessage(nodeId, cacheMsg, cacheMsg.cacheGroupMessage() ? grpHandlers : cacheHandlers); + } + + /** + * @param nodeId Sender node ID. + * @param cacheMsg Message. + * @param msgHandlers Message handlers. + */ + @SuppressWarnings("unchecked") + private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) { int msgIdx = cacheMsg.lookupIndex(); IgniteBiInClosure<UUID, GridCacheMessage> c = null; if (msgIdx >= 0) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.cacheId()); + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); if (cacheClsHandlers != null) c = cacheClsHandlers[msgIdx]; } if (c == null) - c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); + c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); if (c == null) { IgniteLogger log = cacheMsg.messageLogger(cctx); @@ -285,12 +292,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()). append(", msgTopVer=").append(cacheMsg.topologyVersion()). - append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())). + append(", desc=").append(descriptorForMessage(cacheMsg)). append(']'); msg0.append(U.nl()).append("Registered listeners:"); - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); @@ -323,7 +330,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_CACHE); - for (Object ordTopic : orderedHandlers.keySet()) + for (Object ordTopic : cacheHandlers.orderedHandlers.keySet()) + cctx.gridIO().removeMessageListener(ordTopic); + + for (Object ordTopic : grpHandlers.orderedHandlers.keySet()) cctx.gridIO().removeMessageListener(ordTopic); boolean interrupted = false; @@ -520,14 +530,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ private void processFailedMessage(UUID nodeId, GridCacheMessage msg, IgniteBiInClosure<UUID, GridCacheMessage> c) throws IgniteCheckedException { - GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); + assert msg != null; + + GridCacheContext ctx = msg instanceof GridCacheIdMessage ? + cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()) : null; switch (msg.directType()) { case 30: { GridDhtLockRequest req = (GridDhtLockRequest)msg; GridDhtLockResponse res = new GridDhtLockResponse( - ctx.cacheId(), + req.cacheId(), req.version(), req.futureId(), req.miniId(), @@ -560,17 +573,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), req.partition(), req.futureId(), - ctx.deploymentEnabled()); + false); res.onError(req.classError()); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); if (req.nearNodeId() != null) { - GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), req.nearFutureId(), nodeId, @@ -588,12 +601,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg; GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), nodeId, req.futureId(), req.partition(), false, - ctx.deploymentEnabled()); + false); res.error(req.classError()); @@ -606,10 +619,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg; GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( - ctx.cacheId(), + req.cacheId(), req.futureId(), req.miniId(), - ctx.deploymentEnabled() + false ); res.error(req.classError()); @@ -629,7 +642,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearGetRequest req = (GridNearGetRequest)msg; GridNearGetResponse res = new GridNearGetResponse( - ctx.cacheId(), + req.cacheId(), req.futureId(), req.miniId(), req.version(), @@ -645,7 +658,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 50: { GridNearGetResponse res = (GridNearGetResponse)msg; - CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId()); + CacheGetFuture fut = (CacheGetFuture)cctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -665,7 +678,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearLockRequest req = (GridNearLockRequest)msg; GridNearLockResponse res = new GridNearLockResponse( - ctx.cacheId(), + req.cacheId(), req.version(), req.futureId(), req.miniId(), @@ -673,7 +686,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { 0, req.classError(), null, - ctx.deploymentEnabled()); + false); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } @@ -712,7 +725,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cctx.deploymentEnabled()); cctx.io().sendOrderedMessage( - ctx.node(nodeId), + cctx.node(nodeId), TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), res, ctx.ioPolicy(), @@ -731,7 +744,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg; GridNearSingleGetResponse res = new GridNearSingleGetResponse( - ctx.cacheId(), + req.cacheId(), req.futureId(), req.topologyVersion(), null, @@ -748,7 +761,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 117: { GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg; - GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc() + GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)cctx.mvcc() .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId())); if (fut == null) { @@ -769,12 +782,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), nodeId, req.futureId(), req.partition(), false, - ctx.deploymentEnabled()); + false); res.error(req.classError()); @@ -787,12 +800,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg; GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), nodeId, req.futureId(), req.partition(), false, - ctx.deploymentEnabled()); + false); res.error(req.classError()); @@ -805,12 +818,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg; GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), nodeId, req.futureId(), req.partition(), false, - ctx.deploymentEnabled()); + false); res.error(req.classError()); @@ -823,17 +836,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtAtomicSingleUpdateRequest req = (GridDhtAtomicSingleUpdateRequest)msg; GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( - ctx.cacheId(), + req.cacheId(), req.partition(), req.futureId(), - ctx.deploymentEnabled()); + false); res.onError(req.classError()); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); if (req.nearNodeId() != null) { - GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), req.partition(), req.nearFutureId(), nodeId, @@ -887,8 +900,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (txState != null) txState.unwindEvicts(cctx); } - else { - GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); + else if (msg instanceof GridCacheIdMessage) { + GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId()); if (ctx != null) CU.unwindEvicts(ctx); @@ -1180,79 +1193,125 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * Adds message handler. - * - * @param cacheId Cache ID. + * @param hndId Message handler ID. * @param type Type of message. * @param c Handler. */ - @SuppressWarnings({"unchecked"}) - public void addHandler( - int cacheId, + public void addCacheHandler( + int hndId, Class<? extends GridCacheMessage> type, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + assert !type.isAssignableFrom(GridCacheGroupIdMessage.class) : type; + + addHandler(hndId, type, c, cacheHandlers); + } + + /** + * @param hndId Message handler ID. + * @param type Type of message. + * @param c Handler. + */ + public void addCacheGroupHandler( + int hndId, + Class<? extends GridCacheGroupIdMessage> type, + IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + assert !type.isAssignableFrom(GridCacheIdMessage.class) : type; + + addHandler(hndId, type, c, grpHandlers); + } + + /** + * @param hndId Message handler ID. + * @param type Type of message. + * @param c Handler. + * @param msgHandlers Message handlers. + */ + @SuppressWarnings({"unchecked"}) + private void addHandler( + int hndId, + Class<? extends GridCacheMessage> type, + IgniteBiInClosure<UUID, ? extends GridCacheMessage> c, + MessageHandlers msgHandlers) { int msgIdx = messageIndex(type); if (msgIdx != -1) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = idxClsHandlers; + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheId); + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(hndId); if (cacheClsHandlers == null) { cacheClsHandlers = new IgniteBiInClosure[GridCacheMessage.MAX_CACHE_MSG_LOOKUP_INDEX]; - idxClsHandlers0.put(cacheId, cacheClsHandlers); + idxClsHandlers0.put(hndId, cacheClsHandlers); } if (cacheClsHandlers[msgIdx] != null) - throw new IgniteException("Duplicate cache message ID found [cacheId=" + cacheId + + throw new IgniteException("Duplicate cache message ID found [hndId=" + hndId + ", type=" + type + ']'); cacheClsHandlers[msgIdx] = c; - idxClsHandlers = idxClsHandlers0; + msgHandlers.idxClsHandlers = idxClsHandlers0; return; } else { - ListenerKey key = new ListenerKey(cacheId, type); + ListenerKey key = new ListenerKey(hndId, type); - if (clsHandlers.putIfAbsent(key, + if (msgHandlers.clsHandlers.putIfAbsent(key, (IgniteBiInClosure<UUID, GridCacheMessage>)c) != null) - assert false : "Handler for class already registered [cacheId=" + cacheId + ", cls=" + type + - ", old=" + clsHandlers.get(key) + ", new=" + c + ']'; + assert false : "Handler for class already registered [hndId=" + hndId + ", cls=" + type + + ", old=" + msgHandlers.clsHandlers.get(key) + ", new=" + c + ']'; } IgniteLogger log0 = log; if (log0 != null && log0.isTraceEnabled()) log0.trace( - "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + + "Registered cache communication handler [hndId=" + hndId + ", type=" + type + ", msgIdx=" + msgIdx + ", handler=" + c + ']'); } /** * @param cacheId Cache ID to remove handlers for. */ - public void removeHandlers(int cacheId) { - assert cacheId != 0; + void removeCacheHandlers(int cacheId) { + removeHandlers(cacheHandlers, cacheId); + } + + /** + * @param grpId Cache group ID to remove handlers for. + */ + void removeCacheGroupHandlers(int grpId) { + removeHandlers(grpHandlers, grpId); + } + + /** + * @param msgHandlers Handlers. + * @param hndId ID to remove handlers for. + */ + private void removeHandlers(MessageHandlers msgHandlers, int hndId) { + assert hndId != 0; - idxClsHandlers.remove(cacheId); + msgHandlers.idxClsHandlers.remove(hndId); - for (Iterator<ListenerKey> iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) { + for (Iterator<ListenerKey> iter = msgHandlers.clsHandlers.keySet().iterator(); iter.hasNext(); ) { ListenerKey key = iter.next(); - if (key.cacheId == cacheId) + if (key.hndId == hndId) iter.remove(); } } /** - * @param cacheId Cache ID to remove handlers for. + * @param cacheGrp {@code True} if cache group handler, {@code false} if cache handler. + * @param hndId Handler ID. * @param type Message type. */ - public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) { - clsHandlers.remove(new ListenerKey(cacheId, type)); + public void removeHandler(boolean cacheGrp, int hndId, Class<? extends GridCacheMessage> type) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + + msgHandlers.clsHandlers.remove(new ListenerKey(hndId, type)); } /** @@ -1274,16 +1333,35 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param topic Topic. + * @param c Handler. + */ + public void addOrderedCacheHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheIdMessage> c) { + addOrderedHandler(false, topic, c); + } + + /** + * @param topic Topic. + * @param c Handler. + */ + public void addOrderedCacheGroupHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheGroupIdMessage> c) { + addOrderedHandler(true, topic, c); + } + + /** * Adds ordered message handler. * + * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param topic Topic. * @param c Handler. */ @SuppressWarnings({"unchecked"}) - public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + private void addOrderedHandler(boolean cacheGrp, Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + IgniteLogger log0 = log; - if (orderedHandlers.putIfAbsent(topic, c) == null) { + if (msgHandlers.orderedHandlers.putIfAbsent(topic, c) == null) { cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( (IgniteBiInClosure<UUID, GridCacheMessage>)c)); @@ -1298,10 +1376,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * Removed ordered message handler. * + * @param cacheGrp {@code True} if cache group message, {@code false} if cache message. * @param topic Topic. */ - public void removeOrderedHandler(Object topic) { - if (orderedHandlers.remove(topic) != null) { + public void removeOrderedHandler(boolean cacheGrp, Object topic) { + MessageHandlers msgHandlers = cacheGrp ? grpHandlers : cacheHandlers; + + if (msgHandlers.orderedHandlers.remove(topic) != null) { cctx.gridIO().removeMessageListener(topic); if (log != null && log.isDebugEnabled()) @@ -1352,12 +1433,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } + /** + * @param msg Message. + * @return Cache or group descriptor. + */ + private Object descriptorForMessage(GridCacheMessage msg) { + if (msg instanceof GridCacheIdMessage) + return cctx.cache().cacheDescriptor(((GridCacheIdMessage)msg).cacheId()); + else if (msg instanceof GridCacheGroupIdMessage) + return cctx.cache().cacheGroupDescriptors().get(((GridCacheGroupIdMessage)msg).groupId()); + + return null; + } + /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Cache IO manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> clsHandlersSize: " + clsHandlers.size()); - X.println(">>> orderedHandlersSize: " + orderedHandlers.size()); + X.println(">>> cacheClsHandlersSize: " + cacheHandlers.clsHandlers.size()); + X.println(">>> cacheOrderedHandlersSize: " + cacheHandlers.orderedHandlers.size()); + X.println(">>> cacheGrpClsHandlersSize: " + grpHandlers.clsHandlers.size()); + X.println(">>> cacheGrpOrderedHandlersSize: " + grpHandlers.orderedHandlers.size()); + } + + /** + * + */ + static class MessageHandlers { + /** Indexed class handlers. */ + volatile Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + + /** Handler registry. */ + ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> + clsHandlers = new ConcurrentHashMap8<>(); + + /** Ordered handler registry. */ + ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers = + new ConcurrentHashMap8<>(); } /** @@ -1391,17 +1503,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ private static class ListenerKey { /** Cache ID. */ - private int cacheId; + private int hndId; /** Message class. */ private Class<? extends GridCacheMessage> msgCls; /** - * @param cacheId Cache ID. + * @param hndId Handler ID. * @param msgCls Message class. */ - private ListenerKey(int cacheId, Class<? extends GridCacheMessage> msgCls) { - this.cacheId = cacheId; + private ListenerKey(int hndId, Class<? extends GridCacheMessage> msgCls) { + this.hndId = hndId; this.msgCls = msgCls; } @@ -1415,12 +1527,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ListenerKey that = (ListenerKey)o; - return cacheId == that.cacheId && msgCls.equals(that.msgCls); + return hndId == that.hndId && msgCls.equals(that.msgCls); } /** {@inheritDoc} */ @Override public int hashCode() { - int res = cacheId; + int res = hndId; res = 31 * res + msgCls.hashCode();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java index db99272..63cfe1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java @@ -18,37 +18,65 @@ package org.apache.ignite.internal.processors.cache; -import java.util.concurrent.atomic.AtomicInteger; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; /** * GridCacheConcurrentMap implementation for local and near caches. */ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { /** */ - private final AtomicInteger pubSize = new AtomicInteger(); + private final int cacheId; - public GridCacheLocalConcurrentMap(GridCacheContext ctx, - GridCacheMapEntryFactory factory, int initialCapacity) { - super(ctx, factory, initialCapacity); + /** */ + private final CacheMapHolder entryMap; + + /** + * @param cctx Cache context. + * @param factory Entry factory. + * @param initCap Initial capacity. + */ + public GridCacheLocalConcurrentMap(GridCacheContext cctx, GridCacheMapEntryFactory factory, int initCap) { + super(factory); + + this.cacheId = cctx.cacheId(); + this.entryMap = new CacheMapHolder(cctx, + new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2)); } - public GridCacheLocalConcurrentMap(GridCacheContext ctx, - GridCacheMapEntryFactory factory, int initialCapacity, float loadFactor, int concurrencyLevel) { - super(ctx, factory, initialCapacity, loadFactor, concurrencyLevel); + /** {@inheritDoc} */ + @Override public int internalSize() { + return entryMap.map.size(); } /** {@inheritDoc} */ - @Override public int publicSize() { - return pubSize.get(); + @Nullable @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + return entryMap; } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { - pubSize.incrementAndGet(); + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return entryMap; } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { - pubSize.decrementAndGet(); + @Override public int publicSize(int cacheId) { + assert this.cacheId == cacheId; + + return entryMap.size.get(); + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + assert cacheId == e.context().cacheId(); + + entryMap.size.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) { + assert cacheId == e.context().cacheId(); + + entryMap.size.decrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5e57ca4..28656fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -981,7 +981,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); - updateCntr0 = nextPartCounter(topVer); + updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1160,7 +1160,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - updateCntr0 = nextPartCounter(topVer); + updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); if (updateCntr != null && updateCntr != 0) updateCntr0 = updateCntr; @@ -1562,7 +1562,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateMetrics(op, metrics); if (lsnrCol != null) { - long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE); + long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null); cctx.continuousQueries().onEntryUpdated( lsnrCol, @@ -1651,6 +1651,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); c = new AtomicCacheUpdateClosure(this, + topVer, newVer, op, writeObj, @@ -1677,7 +1678,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme c.call(dataRow); } else - cctx.offheap().invoke(key, localPartition(), c); + cctx.offheap().invoke(cctx, key, localPartition(), c); GridCacheUpdateAtomicResult updateRes = c.updateRes; @@ -1722,7 +1723,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else evtVal = (CacheObject)writeObj; - long updateCntr0 = nextPartCounter(); + long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; @@ -2612,7 +2613,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr = 0; if (!preload) - updateCntr = nextPartCounter(topVer); + updateCntr = nextPartitionCounter(topVer, true, null); if (walEnabled) { cctx.shared().wal().log(new DataRecord(new DataEntry( @@ -2668,33 +2669,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @param topVer Topology version for current operation. + * @param primary Primary node update flag. + * @param primaryCntr Counter assigned on primary node. * @return Update counter. */ - protected long nextPartCounter() { + protected long nextPartitionCounter(AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { return 0; } - /** - * @param topVer Topology version. - * @return Update counter. - */ - private long nextPartCounter(AffinityTopologyVersion topVer) { - long updateCntr; - - if (!cctx.isLocal() && !isNear()) { - GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false); - - if (locPart == null) - return 0; - - updateCntr = locPart.nextUpdateCounter(); - } - else - updateCntr = 0; - - return updateCntr; - } - /** {@inheritDoc} */ @Override public synchronized GridCacheVersionedEntryEx versionedEntry(final boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { @@ -3192,7 +3175,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme synchronized (this) { checkObsolete(); - cctx.offheap().updateIndexes(key, localPartition()); + if (cctx.queries().enabled()) + cctx.offheap().updateIndexes(cctx, key, localPartition()); } } @@ -3227,14 +3211,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param oldRow Old row if available. * @throws IgniteCheckedException If update failed. */ - protected void storeValue(@Nullable CacheObject val, + protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { assert Thread.holdsLock(this); assert val != null : "null values in update for key: " + key; - cctx.offheap().invoke(key, localPartition(), new UpdateClosure(this, val, ver, expireTime)); + cctx.offheap().invoke(cctx, key, localPartition(), new UpdateClosure(this, val, ver, expireTime)); } /** @@ -3276,7 +3260,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme protected void removeValue() throws IgniteCheckedException { assert Thread.holdsLock(this); - cctx.offheap().remove(key, partition(), localPartition()); + cctx.offheap().remove(cctx, key, partition(), localPartition()); } /** {@inheritDoc} */ @@ -3606,7 +3590,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridDhtLocalPartition locPart = localPartition(); if (locPart != null) - locPart.incrementPublicSize(this); + locPart.incrementPublicSize(null, this); else cctx.incrementPublicSize(this); } @@ -3618,7 +3602,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridDhtLocalPartition locPart = localPartition(); if (locPart != null) - locPart.decrementPublicSize(this); + locPart.decrementPublicSize(null, this); else cctx.decrementPublicSize(this); } @@ -3930,7 +3914,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (oldRow != null) oldRow.key(entry.key); - newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key, + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( + entry.cctx, + entry.key, val, ver, expireTime, @@ -3964,6 +3950,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private final GridCacheMapEntry entry; /** */ + private final AffinityTopologyVersion topVer; + + /** */ private GridCacheVersion newVer; /** */ @@ -4026,7 +4015,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ private CacheDataRow oldRow; - AtomicCacheUpdateClosure(GridCacheMapEntry entry, + AtomicCacheUpdateClosure( + GridCacheMapEntry entry, + AffinityTopologyVersion topVer, GridCacheVersion newVer, GridCacheOperation op, Object writeObj, @@ -4047,6 +4038,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert op == UPDATE || op == DELETE || op == TRANSFORM : op; this.entry = entry; + this.topVer = topVer; this.newVer = newVer; this.op = op; this.writeObj = writeObj; @@ -4282,7 +4274,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (needUpdate) { - newRow = entry.localPartition().dataStore().createRow(entry.key, + newRow = entry.localPartition().dataStore().createRow( + entry.cctx, + entry.key, storeLoadedVal, newVer, entry.expireTimeExtras(), @@ -4413,7 +4407,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ", locNodeId=" + cctx.localNodeId() + ']'; } - long updateCntr0 = entry.nextPartCounter(); + long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; @@ -4421,7 +4415,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0); if (!entry.isNear()) { - newRow = entry.localPartition().dataStore().createRow(entry.key, + newRow = entry.localPartition().dataStore().createRow( + entry.cctx, + entry.key, updated, newVer, newExpireTime, @@ -4495,7 +4491,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, entry.key); - long updateCntr0 = entry.nextPartCounter(); + long updateCntr0 = entry.nextPartitionCounter(topVer, primary, updateCntr); if (updateCntr != null) updateCntr0 = updateCntr; http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 4de465c..11916e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -84,9 +84,15 @@ public abstract class GridCacheMessage implements Message { @GridDirectTransient private boolean skipPrepare; - /** Cache ID. */ - @GridToStringInclude - protected int cacheId; + /** + * @return ID to distinguish message handlers for the same messages but for different caches/cache groups. + */ + public abstract int handlerId(); + + /** + * @return {@code True} if cache group message. + */ + public abstract boolean cacheGroupMessage(); /** * @return Error, if any. @@ -170,20 +176,6 @@ public abstract class GridCacheMessage implements Message { } /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** - * @param cacheId Cache ID. - */ - public void cacheId(int cacheId) { - this.cacheId = cacheId; - } - - /** * Gets topology version or -1 in case of topology version is not required for this message. * * @return Topology version. @@ -205,6 +197,15 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ protected final void prepareObject(@Nullable Object o, GridCacheContext ctx) throws IgniteCheckedException { + prepareObject(o, ctx.shared()); + } + + /** + * @param o Object to prepare for marshalling. + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + protected final void prepareObject(@Nullable Object o, GridCacheSharedContext ctx) throws IgniteCheckedException { assert addDepInfo || forceAddDepInfo; if (!skipPrepare && o != null) { @@ -279,24 +280,28 @@ public abstract class GridCacheMessage implements Message { /** * @param info Entry to marshal. * @param ctx Context. + * @param cacheObjCtx Cache object context. * @throws IgniteCheckedException If failed. */ - protected final void marshalInfo(GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + protected final void marshalInfo(GridCacheEntryInfo info, + GridCacheSharedContext ctx, + CacheObjectContext cacheObjCtx + ) throws IgniteCheckedException { assert ctx != null; if (info != null) { - info.marshal(ctx); + info.marshal(cacheObjCtx); if (addDepInfo) { if (info.key() != null) - prepareObject(info.key().value(ctx.cacheObjectContext(), false), ctx); + prepareObject(info.key().value(cacheObjCtx, false), ctx); CacheObject val = info.value(); if (val != null) { - val.finishUnmarshal(ctx.cacheObjectContext(), ctx.deploy().globalLoader()); + val.finishUnmarshal(cacheObjCtx, ctx.deploy().globalLoader()); - prepareObject(CU.value(val, ctx, false), ctx); + prepareObject(val.value(cacheObjCtx, false), ctx); } } } @@ -314,7 +319,7 @@ public abstract class GridCacheMessage implements Message { assert ctx != null; if (info != null) - info.unmarshal(ctx, ldr); + info.unmarshal(ctx.cacheObjectContext(), ldr); } /** @@ -324,13 +329,14 @@ public abstract class GridCacheMessage implements Message { */ protected final void marshalInfos( Iterable<? extends GridCacheEntryInfo> infos, - GridCacheContext ctx + GridCacheSharedContext ctx, + CacheObjectContext cacheObjCtx ) throws IgniteCheckedException { assert ctx != null; if (infos != null) for (GridCacheEntryInfo e : infos) - marshalInfo(e, ctx); + marshalInfo(e, ctx, cacheObjCtx); } /** @@ -369,14 +375,14 @@ public abstract class GridCacheMessage implements Message { if (addDepInfo) { if (e.key() != null) - prepareObject(e.key().value(cctx.cacheObjectContext(), false), cctx); + prepareObject(e.key().value(cctx.cacheObjectContext(), false), ctx); if (e.value() != null) - prepareObject(e.value().value(cctx.cacheObjectContext(), false), cctx); + prepareObject(e.value().value(cctx.cacheObjectContext(), false), ctx); if (e.entryProcessors() != null) { for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors()) - prepareObject(entProc.get1(), cctx); + prepareObject(entProc.get1(), ctx); } } else if (p2pEnabled && e.entryProcessors() != null) { @@ -384,7 +390,7 @@ public abstract class GridCacheMessage implements Message { forceAddDepInfo = true; for (T2<EntryProcessor<Object, Object, Object>, Object[]> entProc : e.entryProcessors()) - prepareObject(entProc.get1(), cctx); + prepareObject(entProc.get1(), ctx); } } } @@ -435,7 +441,7 @@ public abstract class GridCacheMessage implements Message { Object arg = args[i]; if (addDepInfo) - prepareObject(arg, ctx); + prepareObject(arg, ctx.shared()); argsBytes[i] = arg == null ? null : CU.marshal(ctx, arg); } @@ -487,7 +493,7 @@ public abstract class GridCacheMessage implements Message { for (Object o : col) { if (addDepInfo) - prepareObject(o, ctx); + prepareObject(o, ctx.shared()); byteCol.add(o == null ? null : CU.marshal(ctx, o)); } @@ -522,7 +528,7 @@ public abstract class GridCacheMessage implements Message { obj.prepareMarshal(ctx.cacheObjectContext()); if (addDepInfo) - prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); + prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared()); } } @@ -541,7 +547,7 @@ public abstract class GridCacheMessage implements Message { obj.prepareMarshal(ctx.cacheObjectContext()); if (addDepInfo) - prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx); + prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx.shared()); } } } @@ -630,6 +636,11 @@ public abstract class GridCacheMessage implements Message { } /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -642,18 +653,12 @@ public abstract class GridCacheMessage implements Message { switch (writer.state()) { case 0: - if (!writer.writeInt("cacheId", cacheId)) - return false; - - writer.incrementState(); - - case 1: if (!writer.writeMessage("depInfo", depInfo)) return false; writer.incrementState(); - case 2: + case 1: if (!writer.writeLong("msgId", msgId)) return false; @@ -673,14 +678,6 @@ public abstract class GridCacheMessage implements Message { switch (reader.state()) { case 0: - cacheId = reader.readInt("cacheId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: depInfo = reader.readMessage("depInfo"); if (!reader.isLastRead()) @@ -688,7 +685,7 @@ public abstract class GridCacheMessage implements Message { reader.incrementState(); - case 2: + case 1: msgId = reader.readLong("msgId"); if (!reader.isLastRead()) @@ -714,6 +711,6 @@ public abstract class GridCacheMessage implements Message { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridCacheMessage.class, this, "cacheId", cacheId); + return S.toString(GridCacheMessage.class, this); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index bdc2216..ae4c164 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -96,7 +96,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; @@ -310,21 +309,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); - cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) { processSinglePartitionUpdate(node, msg); } }); - cctx.io().addHandler(0, GridDhtPartitionsFullMessage.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsFullMessage.class, new MessageHandler<GridDhtPartitionsFullMessage>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { processFullPartitionUpdate(node, msg); } }); - cctx.io().addHandler(0, GridDhtPartitionsSingleRequest.class, + cctx.io().addCacheHandler(0, GridDhtPartitionsSingleRequest.class, new MessageHandler<GridDhtPartitionsSingleRequest>() { @Override public void onMessage(ClusterNode node, GridDhtPartitionsSingleRequest msg) { processSinglePartitionRequest(node, msg); @@ -383,24 +382,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; - cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>() { - @Override public void apply(final UUID id, final GridCacheMessage m) { + cctx.io().addOrderedCacheGroupHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheGroupIdMessage>() { + @Override public void apply(final UUID id, final GridCacheGroupIdMessage m) { if (!enterBusy()) return; try { - GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId); - - if (cacheCtx != null) { - if (m instanceof GridDhtPartitionSupplyMessage) - cacheCtx.preloader().handleSupplyMessage( - idx, id, (GridDhtPartitionSupplyMessage)m); - else if (m instanceof GridDhtPartitionDemandMessage) - cacheCtx.preloader().handleDemandMessage( - idx, id, (GridDhtPartitionDemandMessage)m); - else - U.error(log, "Unsupported message type: " + m.getClass().getName()); + CacheGroupContext grp = cctx.cache().cacheGroup(m.groupId()); + + if (grp != null) { + if (m instanceof GridDhtPartitionSupplyMessage) { + grp.preloader().handleSupplyMessage(idx, id, (GridDhtPartitionSupplyMessage) m); + + return; + } + else if (m instanceof GridDhtPartitionDemandMessage) { + grp.preloader().handleDemandMessage(idx, id, (GridDhtPartitionDemandMessage) m); + + return; + } } + + U.error(log, "Unsupported message type: " + m.getClass().getName()); } finally { leaveBusy(); @@ -418,14 +421,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { fut.get(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onInitialExchangeComplete(null); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.preloader().onInitialExchangeComplete(null); reconnectExchangeFut.onDone(); } catch (IgniteCheckedException e) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onInitialExchangeComplete(e); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.preloader().onInitialExchangeComplete(e); reconnectExchangeFut.onDone(e); } @@ -470,9 +473,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion nodeStartVer = new AffinityTopologyVersion(discoEvt.topologyVersion(), 0); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (nodeStartVer.equals(cacheCtx.startTopologyVersion())) - cacheCtx.preloader().onInitialExchangeComplete(null); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (nodeStartVer.equals(grp.localStartVersion())) + grp.preloader().onInitialExchangeComplete(null); } if (log.isDebugEnabled()) @@ -492,9 +495,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void onKernalStop0(boolean cancel) { cctx.gridEvents().removeDiscoveryEventListener(discoLsnr); - cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); - cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); - cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleMessage.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsFullMessage.class); + cctx.io().removeHandler(false, 0, GridDhtPartitionsSingleRequest.class); stopErr = cctx.kernalContext().clientDisconnected() ? new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(), @@ -514,7 +517,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!cctx.kernalContext().clientNode()) { for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) - cctx.io().removeOrderedHandler(rebalanceTopic(cnt)); + cctx.io().removeOrderedHandler(true, rebalanceTopic(cnt)); } U.cancel(exchWorker); @@ -549,22 +552,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param exchFut Exchange future. * @return Topology. */ - public GridDhtPartitionTopology clientTopology(int cacheId, GridDhtPartitionsExchangeFuture exchFut) { - GridClientPartitionTopology top = clientTops.get(cacheId); + public GridDhtPartitionTopology clientTopology(int grpId, GridDhtPartitionsExchangeFuture exchFut) { + GridClientPartitionTopology top = clientTops.get(grpId); if (top != null) return top; Object affKey = null; - DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId); + CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId); - if (desc != null) { - CacheConfiguration ccfg = desc.cacheConfiguration(); + if (grpDesc != null) { + CacheConfiguration<?, ?> ccfg = grpDesc.config(); AffinityFunction aff = ccfg.getAffinity(); @@ -574,8 +577,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana aff.partitions()); } - GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId, - top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey)); + GridClientPartitionTopology old = clientTops.putIfAbsent(grpId, + top = new GridClientPartitionTopology(cctx, grpId, exchFut, affKey)); return old != null ? old : top; } @@ -588,11 +591,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @return Client partition topology. */ - public GridClientPartitionTopology clearClientTopology(int cacheId) { - return clientTops.remove(cacheId); + public GridClientPartitionTopology clearClientTopology(int grpId) { + return clientTops.remove(grpId); } /** @@ -806,18 +809,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { // Check rebalance state & send CacheAffinityChangeMessage if need. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - if (cacheCtx == null) - continue; - - GridDhtPartitionTopology top = null; - - if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) { + GridDhtPartitionTopology top = grp.topology(); if (top != null) - cctx.affinity().checkRebalanceState(top, cacheCtx.cacheId()); + cctx.affinity().checkRebalanceState(top, grp.groupId()); } } @@ -827,7 +824,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana AffinityTopologyVersion rmtTopVer = lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE; - Collection<ClusterNode> rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); + Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); @@ -845,9 +842,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param nodes Nodes. - * @return {@code True} if message was sent, {@code false} if node left grid. */ - private boolean sendAllPartitions(Collection<ClusterNode> nodes) { + private void sendAllPartitions(Collection<ClusterNode> nodes) { GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, null, null, true); if (log.isDebugEnabled()) @@ -868,8 +864,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); } } - - return true; } /** @@ -896,51 +890,48 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); - cctx.forAllCaches(new IgniteInClosure<GridCacheContext>() { - @Override public void apply(GridCacheContext cacheCtx) { - if (!cacheCtx.isLocal()) { - boolean ready; - - if (exchId != null) { - AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); - - ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0; - } - else - ready = cacheCtx.started(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) { + if (exchId != null) { + AffinityTopologyVersion startTopVer = grp.localStartVersion(); - if (ready) { - GridAffinityAssignmentCache affCache = cacheCtx.affinity().affinityCache(); + if (startTopVer.compareTo(exchId.topologyVersion()) > 0) + continue; + } - GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true); + GridAffinityAssignmentCache affCache = grp.affinity(); - addFullPartitionsMap(m, - dupData, - compress, - cacheCtx.cacheId(), - locMap, - affCache.similarAffinityKey()); + GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); - if (exchId != null) - m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); - } + if (locMap != null) { + addFullPartitionsMap(m, + dupData, + compress, + grp.groupId(), + locMap, + affCache.similarAffinityKey()); } + + if (exchId != null) + m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); } - }); + } // It is important that client topologies be added after contexts. for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { GridDhtPartitionFullMap map = top.partitionMap(true); - addFullPartitionsMap(m, - dupData, - compress, - top.cacheId(), - map, - top.similarAffinityKey()); + if (map != null) { + addFullPartitionsMap(m, + dupData, + compress, + top.groupId(), + map, + top.similarAffinityKey()); + } if (exchId != null) - m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true)); } return m; @@ -950,19 +941,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param m Message. * @param dupData Duplicated data map. * @param compress {@code True} if need check for duplicated partition state data. - * @param cacheId Cache ID. + * @param grpId Cache group ID. * @param map Map to add. * @param affKey Cache affinity key. */ private void addFullPartitionsMap(GridDhtPartitionsFullMessage m, Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData, boolean compress, - Integer cacheId, + Integer grpId, GridDhtPartitionFullMap map, Object affKey) { + assert map != null; + Integer dupDataCache = null; - if (compress && affKey != null && !m.containsCache(cacheId)) { + if (compress && affKey != null && !m.containsGroup(grpId)) { T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey); if (state0 != null && state0.get2().partitionStateEquals(map)) { @@ -978,10 +971,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana dupDataCache = state0.get1(); } else - dupData.put(affKey, new T2<>(cacheId, map)); + dupData.put(affKey, new T2<>(grpId, map)); } - m.addFullPartitionsMap(cacheId, map, dupDataCache); + m.addFullPartitionsMap(grpId, map, dupDataCache); } /** @@ -1029,24 +1022,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) { - GridDhtPartitionMap locMap = cacheCtx.topology().localPartitionMap(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal()) { + GridDhtPartitionMap locMap = grp.topology().localPartitionMap(); addPartitionMap(m, dupData, true, - cacheCtx.cacheId(), + grp.groupId(), locMap, - cacheCtx.affinity().affinityCache().similarAffinityKey()); + grp.affinity().similarAffinityKey()); if (sndCounters) - m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true)); + m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true)); } } for (GridClientPartitionTopology top : clientTops.values()) { - if (m.partitions() != null && m.partitions().containsKey(top.cacheId())) + if (m.partitions() != null && m.partitions().containsKey(top.groupId())) continue; GridDhtPartitionMap locMap = top.localPartitionMap(); @@ -1054,12 +1047,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana addPartitionMap(m, dupData, true, - top.cacheId(), + top.groupId(), locMap, top.similarAffinityKey()); if (sndCounters) - m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true)); + m.partitionUpdateCounters(top.groupId(), top.updateCounters(true)); } return m; @@ -1257,22 +1250,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); - - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + Integer grpId = entry.getKey(); - if (cacheCtx != null && !cacheCtx.started()) - continue; // Can safely ignore background exchange. + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); GridDhtPartitionTopology top = null; - if (cacheCtx == null) - top = clientTops.get(cacheId); - else if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + if (grp == null) + top = clientTops.get(grpId); + else if (!grp.isLocal()) + top = grp.topology(); if (top != null) - updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), cacheId)) != null; + updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)) != null; } if (!cctx.kernalContext().clientNode() && updated) @@ -1280,9 +1270,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean hasMovingParts = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal() && cacheCtx.started() && cacheCtx.topology().hasMovingPartitions()) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (!grp.isLocal() && grp.topology().hasMovingPartitions()) { hasMovingParts = true; + break; } } @@ -1315,25 +1306,25 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean updated = false; for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { - Integer cacheId = entry.getKey(); + Integer grpId = entry.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - if (cacheCtx != null && - cacheCtx.startTopologyVersion().compareTo(entry.getValue().topologyVersion()) > 0) + if (grp != null && + grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0) continue; GridDhtPartitionTopology top = null; - if (cacheCtx == null) - top = clientTops.get(cacheId); - else if (!cacheCtx.isLocal()) - top = cacheCtx.topology(); + if (grp == null) + top = clientTops.get(grpId); + else if (!grp.isLocal()) + top = grp.topology(); if (top != null) { updated |= top.update(null, entry.getValue()) != null; - cctx.affinity().checkRebalanceState(top, cacheId); + cctx.affinity().checkRebalanceState(top, grpId); } } @@ -1422,8 +1413,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana dumpPendingObjects(exchTopVer); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().dumpDebugInfo(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) + grp.preloader().dumpDebugInfo(); cctx.affinity().dumpDebugInfo(); @@ -1586,21 +1577,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - for (GridCacheContext ctx : cctx.cacheContexts()) { - if (ctx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; - - GridCachePreloader preloader = ctx0.preloader(); + GridCachePreloader preloader = grp.preloader(); if (preloader != null) preloader.dumpDebugInfo(); - GridCacheAffinityManager affMgr = ctx0.affinity(); + GridAffinityAssignmentCache aff = grp.affinity(); - if (affMgr != null) - affMgr.dumpDebugInfo(); + if (aff != null) + aff.dumpDebugInfo(); } } @@ -1743,8 +1732,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { boolean preloadFinished = true; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; + + preloadFinished &= grp.preloader() != null && grp.preloader().syncFuture().isDone(); if (!preloadFinished) break; @@ -1851,11 +1843,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean changed = false; - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.isLocal()) + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) continue; - changed |= cacheCtx.topology().afterExchange(exchFut); + changed |= grp.topology().afterExchange(exchFut); } if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange()) @@ -1875,16 +1867,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!exchFut.skipPreload() && cctx.kernalContext().state().active()) { assignsMap = new HashMap<>(); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - long delay = cacheCtx.config().getRebalanceDelay(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + long delay = grp.config().getRebalanceDelay(); GridDhtPreloaderAssignments assigns = null; // Don't delay for dummy reassigns to avoid infinite recursion. if (delay == 0 || forcePreload) - assigns = cacheCtx.preloader().assign(exchFut); + assigns = grp.preloader().assign(exchFut); - assignsMap.put(cacheCtx.cacheId(), assigns); + assignsMap.put(grp.groupId(), assigns); } } } @@ -1899,16 +1891,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>(); for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { - int cacheId = e.getKey(); + int grpId = e.getKey(); - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - int order = cacheCtx.config().getRebalanceOrder(); + int order = grp.config().getRebalanceOrder(); if (orderMap.get(order) == null) orderMap.put(order, new ArrayList<Integer>(size)); - orderMap.get(order).add(cacheId); + orderMap.get(order).add(grpId); } Runnable r = null; @@ -1918,35 +1910,27 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean assignsCancelled = false; for (Integer order : orderMap.descendingKeySet()) { - for (Integer cacheId : orderMap.get(order)) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); + for (Integer grpId : orderMap.get(order)) { + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); + GridDhtPreloaderAssignments assigns = assignsMap.get(grpId); if (assigns != null) assignsCancelled |= assigns.cancelled(); - List<String> waitList = new ArrayList<>(size - 1); - - for (List<Integer> cIds : orderMap.headMap(order).values()) { - for (Integer cId : cIds) - waitList.add(cctx.cacheContext(cId).name()); - } - // Cancels previous rebalance future (in case it's not done yet). // Sends previous rebalance stopped event (if necessary). // Creates new rebalance future. // Sends current rebalance started event (if necessary). // Finishes cache sync future (on empty assignments). - Runnable cur = cacheCtx.preloader().addAssignments(assigns, + Runnable cur = grp.preloader().addAssignments(assigns, forcePreload, - waitList, cnt, r, exchFut.forcedRebalanceFuture()); if (cur != null) { - rebList.add(U.maskName(cacheCtx.name())); + rebList.add(grp.cacheOrGroupName()); r = cur; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 5ae68e8..0ac0272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -22,6 +22,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -45,18 +46,6 @@ public interface GridCachePreloader { public void start() throws IgniteCheckedException; /** - * Stops preloading. - */ - public void stop(); - - /** - * Kernal start callback. - * - * @throws IgniteCheckedException If failed. - */ - public void onKernalStart() throws IgniteCheckedException; - - /** * Kernal stop callback. */ public void onKernalStop(); @@ -90,7 +79,6 @@ public interface GridCachePreloader { */ public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut); @@ -136,20 +124,25 @@ public interface GridCachePreloader { /** * Requests that preloader sends the request for the key. * + * @param cctx Cache context. * @param keys Keys to request. * @param topVer Topology version, {@code -1} if not required. * @return Future to complete when all keys are preloaded. */ - public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer); + public GridDhtFuture<Object> request(GridCacheContext cctx, + Collection<KeyCacheObject> keys, + AffinityTopologyVersion topVer); /** * Requests that preloader sends the request for the key. * + * @param cctx Cache context. * @param req Message with keys to request. * @param topVer Topology version, {@code -1} if not required. * @return Future to complete when all keys are preloaded. */ - public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req, + public GridDhtFuture<Object> request(GridCacheContext cctx, + GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 47c37f5..98874e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -21,9 +21,9 @@ import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -39,15 +39,15 @@ import org.jetbrains.annotations.Nullable; * Adapter for preloading which always assumes that preloading finished. */ public class GridCachePreloaderAdapter implements GridCachePreloader { - /** Cache context. */ - protected final GridCacheContext<?, ?> cctx; + /** */ + protected final CacheGroupContext grp; + + /** */ + protected final GridCacheSharedContext ctx; /** Logger. */ protected final IgniteLogger log; - /** Affinity. */ - protected final AffinityFunction aff; - /** Start future (always completed by default). */ private final IgniteInternalFuture finFut; @@ -55,15 +55,16 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { protected IgnitePredicate<GridCacheEntryInfo> preloadPred; /** - * @param cctx Cache context. + * @param grp Cache group. */ - public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) { - assert cctx != null; + public GridCachePreloaderAdapter(CacheGroupContext grp) { + assert grp != null; - this.cctx = cctx; + this.grp = grp; - log = cctx.logger(getClass()); - aff = cctx.config().getAffinity(); + ctx = grp.shared(); + + log = ctx.logger(getClass()); finFut = new GridFinishedFuture(); } @@ -74,16 +75,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void stop() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ @Override public void onKernalStop() { // No-op. } @@ -130,7 +121,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** {@inheritDoc} */ @Override public void unwindUndeploys() { - cctx.deploy().unwind(cctx); + grp.unwindUndeploys(); } /** {@inheritDoc} */ @@ -144,15 +135,15 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys, + @Override public GridDhtFuture<Object> request(GridCacheContext ctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) { - return new GridFinishedFuture<>(); + return null; } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Object> request(GridNearAtomicAbstractUpdateRequest req, + @Override public GridDhtFuture<Object> request(GridCacheContext ctx, GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) { - return new GridFinishedFuture<>(); + return null; } /** {@inheritDoc} */ @@ -168,7 +159,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { /** {@inheritDoc} */ @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection<String> caches, int cnt, Runnable next, @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
