http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 37e495e..09eea27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -36,6 +36,7 @@ import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -301,52 +302,151 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) { - int msgIdx = cacheMsg.lookupIndex(); + Lock lock = rw.readLock(); - IgniteBiInClosure<UUID, GridCacheMessage> c = null; + lock.lock(); - if (msgIdx >= 0) { - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + try { + int msgIdx = cacheMsg.lookupIndex(); - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); + IgniteBiInClosure<UUID, GridCacheMessage> c = null; - if (cacheClsHandlers != null) - c = cacheClsHandlers[msgIdx]; - } + if (msgIdx >= 0) { + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; - if (c == null) - c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId()); - if (c == null) { - IgniteLogger log = cacheMsg.messageLogger(cctx); + if (cacheClsHandlers != null) + c = cacheClsHandlers[msgIdx]; + } - StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) ["); + if (c == null) + c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass())); - appendMessageInfo(cacheMsg, nodeId, msg0); + if (c == null) { + if (processMissedHandler(nodeId, cacheMsg)) + return; - msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()). - append(", msgTopVer=").append(cacheMsg.topologyVersion()). - append(", desc=").append(descriptorForMessage(cacheMsg)). - append(']'); + IgniteLogger log = cacheMsg.messageLogger(cctx); - msg0.append(U.nl()).append("Registered listeners:"); + StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) ["); - Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + appendMessageInfo(cacheMsg, nodeId, msg0); - for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) - msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); + msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()). + append(", msgTopVer=").append(cacheMsg.topologyVersion()). + append(", desc=").append(descriptorForMessage(cacheMsg)). + append(']'); - if (cctx.kernalContext().isStopping()) { - if (log.isDebugEnabled()) - log.debug(msg0.toString()); + msg0.append(U.nl()).append("Registered listeners:"); + + Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers; + + for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet()) + msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue())); + + if (cctx.kernalContext().isStopping()) { + if (log.isDebugEnabled()) + log.debug(msg0.toString()); + } + else + U.error(log, msg0.toString()); + + return; } - else - U.error(log, msg0.toString()); - return; + onMessage0(nodeId, cacheMsg, c); + } + finally { + lock.unlock(); } + } + + /** + * @param nodeId Node ID. + * @param cacheMsg Message. + * @return {@code True} if message processed. + */ + private boolean processMissedHandler(UUID nodeId, GridCacheMessage cacheMsg) { + // It is possible to receive reader update after client near cache was closed. + if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) { + GridDhtAtomicAbstractUpdateRequest req = (GridDhtAtomicAbstractUpdateRequest)cacheMsg; + + if (req.nearSize() > 0) { + List<KeyCacheObject> nearEvicted = new ArrayList<>(req.nearSize()); + + for (int i = 0; i < req.nearSize(); i++) + nearEvicted.add(req.nearKey(i)); + + GridDhtAtomicUpdateResponse dhtRes = new GridDhtAtomicUpdateResponse(req.cacheId(), + req.partition(), + req.futureId(), + false); + + dhtRes.nearEvicted(nearEvicted); + + sendMessageForMissedHandler(cacheMsg, + nodeId, + dhtRes, + nodeId, + GridIoPolicy.SYSTEM_POOL); + + if (req.nearNodeId() != null) { + GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(), + req.partition(), + req.nearFutureId(), + nodeId, + req.flags()); + + sendMessageForMissedHandler(cacheMsg, + nodeId, + nearRes, + req.nearNodeId(), + GridIoPolicy.SYSTEM_POOL); + } + + return true; + } + } + + return false; + } + + /** + * @param origMsg Message without handler. + * @param origMsgNode Node sent {@code origMsg}. + * @param nodeId Target node ID. + * @param msg Response. + * @param plc Policy. + */ + private void sendMessageForMissedHandler( + GridCacheMessage origMsg, + UUID origMsgNode, + GridCacheMessage msg, + UUID nodeId, + byte plc) { + IgniteLogger log = msg.messageLogger(cctx); - onMessage0(nodeId, cacheMsg, c); + try { + if (log.isDebugEnabled()) { + log.debug("Received message without registered handler, " + + "send response [locTopVer=" + cctx.exchange().readyAffinityVersion() + + ", msgTopVer=" + origMsg.topologyVersion() + + ", node=" + origMsgNode + + ", msg=" + origMsg + + ", resNode=" + nodeId + + ", res=" + msg + ']'); + } + + send(nodeId, msg, plc); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send response, node left [nodeId=" + nodeId + ", msg=" + msg + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send response [nodeId=" + nodeId + ", msg=" + msg + ", err=" + e + ']'); + } } /** {@inheritDoc} */ @@ -359,17 +459,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr); } - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override protected void onKernalStop0(boolean cancel) { - cctx.gridIO().removeMessageListener(TOPIC_CACHE); - - for (Object ordTopic : cacheHandlers.orderedHandlers.keySet()) - cctx.gridIO().removeMessageListener(ordTopic); - - for (Object ordTopic : grpHandlers.orderedHandlers.keySet()) - cctx.gridIO().removeMessageListener(ordTopic); - + /** + * + */ + public void writeLock() { boolean interrupted = false; // Busy wait is intentional. @@ -389,6 +482,27 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (interrupted) Thread.currentThread().interrupt(); + } + + /** + * + */ + public void writeUnlock() { + rw.writeLock().unlock(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override protected void onKernalStop0(boolean cancel) { + cctx.gridIO().removeMessageListener(TOPIC_CACHE); + + for (Object ordTopic : cacheHandlers.orderedHandlers.keySet()) + cctx.gridIO().removeMessageListener(ordTopic); + + for (Object ordTopic : grpHandlers.orderedHandlers.keySet()) + cctx.gridIO().removeMessageListener(ordTopic); + + writeLock(); try { stopping = true; @@ -406,10 +520,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg, final IgniteBiInClosure<UUID, GridCacheMessage> c) { - Lock lock = rw.readLock(); - - lock.lock(); - try { if (stopping) { if (log.isDebugEnabled()) @@ -438,8 +548,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { finally { if (depEnabled) cctx.deploy().ignoreOwnership(false); - - lock.unlock(); } } @@ -1527,9 +1635,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']'); - final GridCacheMessage cacheMsg = (GridCacheMessage)msg; + Lock lock = rw.readLock(); - onMessage0(nodeId, cacheMsg, c); + lock.lock(); + + try { + GridCacheMessage cacheMsg = (GridCacheMessage)msg; + + onMessage0(nodeId, cacheMsg, c); + } + finally { + lock.unlock(); + } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fdf8a2f..901667f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -337,6 +337,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param task Task to run in exchange worker thread. + */ + public void addCustomTask(CachePartitionExchangeWorkerTask task) { + assert task != null; + + exchWorker.addCustomTask(task); + } + + /** * @return Reconnect partition exchange future. */ public IgniteInternalFuture<?> reconnectExchangeFuture() { @@ -1731,7 +1740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.cache().processCustomExchangeTask(task); } catch (Exception e) { - U.warn(log, "Failed to process custom exchange task: " + task, e); + U.error(log, "Failed to process custom exchange task: " + task, e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ad83b14..96b45df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -184,7 +184,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>(); /** Map of proxies. */ - private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; + private final ConcurrentHashMap<String, IgniteCacheProxy<?, ?>> jCacheProxies; /** Caches stop sequence. */ private final Deque<String> stopSeq; @@ -363,6 +363,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (msg0.exchange()) return new SchemaExchangeWorkerTask(msg0); } + else if (msg instanceof ClientCacheChangeDummyDiscoveryMessage) { + ClientCacheChangeDummyDiscoveryMessage msg0 = (ClientCacheChangeDummyDiscoveryMessage)msg; + + return msg0; + } return null; } @@ -372,7 +377,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param task Task. */ - public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) { + void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) { if (task instanceof SchemaExchangeWorkerTask) { SchemaAbstractDiscoveryMessage msg = ((SchemaExchangeWorkerTask)task).message(); @@ -389,6 +394,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.query().onNodeLeave(task0.node()); } + else if (task instanceof ClientCacheChangeDummyDiscoveryMessage) { + ClientCacheChangeDummyDiscoveryMessage task0 = (ClientCacheChangeDummyDiscoveryMessage)task; + + sharedCtx.affinity().processClientCachesChanges(task0); + } + else if (task instanceof ClientCacheUpdateTimeout) { + ClientCacheUpdateTimeout task0 = (ClientCacheUpdateTimeout)task; + + sharedCtx.affinity().sendClientCacheChangesMessage(task0); + } else U.warn(log, "Unsupported custom exchange task: " + task); } @@ -1998,21 +2013,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param req Stop request. + * @param cacheName Cache name. + * @param stop {@code True} for stop cache, {@code false} for close cache. */ - void blockGateway(DynamicCacheChangeRequest req) { - assert req.stop() || req.close(); - - if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) { - // Break the proxy before exchange future is done. - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(req.cacheName()); + void blockGateway(String cacheName, boolean stop) { + // Break the proxy before exchange future is done. + IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); - if (proxy != null) { - if (req.stop()) - proxy.gate().stopped(); - else - proxy.closeProxy(); - } + if (proxy != null) { + if (stop) + proxy.gate().stopped(); + else + proxy.closeProxy(); } } @@ -2030,22 +2042,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param req Stop request. + * @param cacheName Cache name. + * @param destroy Cache destroy flag. * @return Cache group for stopped cache. */ - private CacheGroupContext prepareCacheStop(DynamicCacheChangeRequest req, boolean forceClose) { - assert req.stop() || req.close() || forceClose : req; - - GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName()); + private CacheGroupContext prepareCacheStop(String cacheName, boolean destroy) { + GridCacheAdapter<?, ?> cache = caches.remove(cacheName); if (cache != null) { GridCacheContext<?, ?> ctx = cache.context(); sharedCtx.removeCacheContext(ctx); - onKernalStop(cache, req.destroy()); + onKernalStop(cache, destroy); - stopCache(cache, true, req.destroy()); + stopCache(cache, true, destroy); return ctx.group(); } @@ -2054,24 +2065,91 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Closes cache even if it's not fully initialized (e.g. fail on cache init stage). - * - * @param topVer Completed topology version. - * @param act Exchange action. - * @param err Error. + * @param startTopVer Cache start version. + * @param err Cache start error if any. */ - void forceCloseCache( - AffinityTopologyVersion topVer, - final ExchangeActions.ActionData act, - Throwable err - ) { - ExchangeActions actions = new ExchangeActions(){ - @Override List<ActionData> closeRequests(UUID nodeId) { - return Collections.singletonList(act); + void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) { + for (GridCacheAdapter<?, ?> cache : caches.values()) { + GridCacheContext<?, ?> cacheCtx = cache.context(); + + if (cacheCtx.startTopologyVersion().equals(startTopVer) && !jCacheProxies.containsKey(cacheCtx.name())) { + jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + + if (cacheCtx.preloader() != null) + cacheCtx.preloader().onInitialExchangeComplete(err); + } + } + } + + /** + * @param cachesToClose Caches to close. + * @param retClientCaches {@code True} if return IDs of closed client caches. + * @return Closed client caches' IDs. + */ + Set<Integer> closeCaches(Set<String> cachesToClose, boolean retClientCaches) { + Set<Integer> ids = null; + + boolean locked = false; + + try { + for (String cacheName : cachesToClose) { + blockGateway(cacheName, false); + + GridCacheContext ctx = sharedCtx.cacheContext(CU.cacheId(cacheName)); + + if (ctx == null) + continue; + + if (retClientCaches && !ctx.affinityNode()) { + if (ids == null) + ids = U.newHashSet(cachesToClose.size()); + + ids.add(ctx.cacheId()); + } + + if (!ctx.affinityNode() && !locked) { + // Do not close client cache while requests processing is in progress. + sharedCtx.io().writeLock(); + + locked = true; + } + + if (!ctx.affinityNode() && ctx.transactional()) + sharedCtx.tm().rollbackTransactionsForCache(ctx.cacheId()); + + closeCache(ctx, false); } - }; - onExchangeDone(topVer, actions, err, true); + return ids; + } + finally { + if (locked) + sharedCtx.io().writeUnlock(); + } + } + + /** + * @param cctx Cache context. + * @param destroy Destroy flag. + */ + private void closeCache(GridCacheContext cctx, boolean destroy) { + if (cctx.affinityNode()) { + GridCacheAdapter<?, ?> cache = caches.get(cctx.name()); + + assert cache != null : cctx.name(); + + jCacheProxies.put(cctx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + } + else { + jCacheProxies.remove(cctx.name()); + + cctx.gate().onStopped(); + + CacheGroupContext grp = prepareCacheStop(cctx.name(), destroy); + + if (grp != null && !grp.hasCaches()) + stopCacheGroup(grp.groupId()); + } } /** @@ -2085,58 +2163,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { public void onExchangeDone( AffinityTopologyVersion topVer, @Nullable ExchangeActions exchActions, - Throwable err, - boolean forceClose + Throwable err ) { - for (GridCacheAdapter<?, ?> cache : caches.values()) { - GridCacheContext<?, ?> cacheCtx = cache.context(); - - if (cacheCtx.startTopologyVersion().equals(topVer)) { - jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + initCacheProxies(topVer, err); - if (cacheCtx.preloader() != null) - cacheCtx.preloader().onInitialExchangeComplete(err); - } - } - - if (exchActions != null && (err == null || forceClose)) { + if (exchActions != null && err == null) { for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) { stopGateway(action.request()); - prepareCacheStop(action.request(), forceClose); + prepareCacheStop(action.request().cacheName(), action.request().destroy()); } for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) stopCacheGroup(grpDesc.groupId()); - - for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) { - String cacheName = req.request().cacheName(); - - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); - - if (proxy != null) { - if (proxy.context().affinityNode()) { - GridCacheAdapter<?, ?> cache = caches.get(cacheName); - - assert cache != null : cacheName; - - jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false)); - } - else { - jCacheProxies.remove(cacheName); - - proxy.context().gate().onStopped(); - - CacheGroupContext grp = prepareCacheStop(req.request(), forceClose); - - if (grp != null && !grp.hasCaches()) - stopCacheGroup(grp.groupId()); - } - } - - if (forceClose) - completeCacheStartFuture(req.request(), false, err); - } } } @@ -2187,6 +2226,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param reqId Request ID. + * @param err Error if any. + */ + void completeClientCacheChangeFuture(UUID reqId, @Nullable Exception err) { + DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId); + + if (fut != null) + fut.onDone(false, err); + } + + /** * Creates shared context. * * @param kernalCtx Kernal context. @@ -2486,8 +2536,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { failIfExists, failIfNotStarted); - if (req != null) - return F.first(initiateCacheChanges(F.asList(req), failIfExists)); + if (req != null) { + if (req.clientStartOnly()) + return startClientCacheChange(F.asMap(req.cacheName(), req), null); + + return F.first(initiateCacheChanges(F.asList(req))); + } else return new GridFinishedFuture<>(); } @@ -2497,6 +2551,31 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param startReqs Start requests. + * @param cachesToClose Cache tp close. + * @return Future for cache change operation. + */ + private IgniteInternalFuture<Boolean> startClientCacheChange( + @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) { + assert startReqs != null ^ cachesToClose != null; + + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID()); + + IgniteInternalFuture old = pendingFuts.put(fut.id, fut); + + assert old == null : old; + + ctx.discovery().clientCacheStartEvent(fut.id, startReqs, cachesToClose); + + IgniteCheckedException err = checkNodeState(); + + if (err != null) + fut.onDone(err); + + return fut; + } + + /** * Dynamically starts multiple caches. * * @param ccfgList Collection of cache configuration. @@ -2527,7 +2606,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (checkThreadTx) checkEmptyTransactions(); - List<DynamicCacheChangeRequest> reqList = new ArrayList<>(ccfgList.size()); + List<DynamicCacheChangeRequest> srvReqs = null; + Map<String, DynamicCacheChangeRequest> clientReqs = null; try { for (CacheConfiguration ccfg : ccfgList) { @@ -2541,20 +2621,41 @@ public class GridCacheProcessor extends GridProcessorAdapter { true ); - if (req != null) - reqList.add(req); + if (req != null) { + if (req.clientStartOnly()) { + if (clientReqs == null) + clientReqs = U.newLinkedHashMap(ccfgList.size()); + + clientReqs.put(req.cacheName(), req); + } + else { + if (srvReqs == null) + srvReqs = new ArrayList<>(ccfgList.size()); + + srvReqs.add(req); + } + } } } catch (Exception e) { return new GridFinishedFuture<>(e); } - if (!reqList.isEmpty()) { + if (srvReqs != null || clientReqs != null) { + if (clientReqs != null && srvReqs == null) + return startClientCacheChange(clientReqs, null); + GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>(); - for (DynamicCacheStartFuture fut : initiateCacheChanges(reqList, failIfExists)) + for (DynamicCacheStartFuture fut : initiateCacheChanges(srvReqs)) compoundFut.add((IgniteInternalFuture)fut); + if (clientReqs != null) { + IgniteInternalFuture<Boolean> clientStartFut = startClientCacheChange(clientReqs, null); + + compoundFut.add((IgniteInternalFuture)clientStartFut); + } + compoundFut.markInitialized(); return compoundFut; @@ -2578,7 +2679,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, sql, true); - return F.first(initiateCacheChanges(F.asList(req), false)); + return F.first(initiateCacheChanges(F.asList(req))); } /** @@ -2600,7 +2701,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>(); - for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs, false)) + for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs)) compoundFut.add((IgniteInternalFuture)fut); compoundFut.markInitialized(); @@ -2622,9 +2723,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { checkEmptyTransactions(); - DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName); + if (proxy.context().isLocal()) + return dynamicDestroyCache(cacheName, false, true); - return F.first(initiateCacheChanges(F.asList(t), false)); + return startClientCacheChange(null, Collections.singleton(cacheName)); } /** @@ -2657,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCompoundFuture fut = new GridCompoundFuture(); - for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false)) + for (DynamicCacheStartFuture f : initiateCacheChanges(reqs)) fut.add(f); fut.markInitialized(); @@ -2751,38 +2853,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param reqs Requests. - * @param failIfExists Fail if exists flag. * @return Collection of futures. */ @SuppressWarnings("TypeMayBeWeakened") private Collection<DynamicCacheStartFuture> initiateCacheChanges( - Collection<DynamicCacheChangeRequest> reqs, - boolean failIfExists + Collection<DynamicCacheChangeRequest> reqs ) { Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size()); Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size()); for (DynamicCacheChangeRequest req : reqs) { - DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req); + DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId()); try { - if (req.stop() || req.close()) { + if (req.stop()) { DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName()); if (desc == null) // No-op. fut.onDone(false); - else { - assert desc.cacheConfiguration() != null : desc; - - if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) { - req.close(false); - - req.stop(true); - } - } } + if (req.start() && req.startCacheConfiguration() != null) { CacheConfiguration ccfg = req.startCacheConfiguration(); @@ -2821,14 +2913,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); - if (ctx.isStopping()) { - err = new IgniteCheckedException("Failed to execute dynamic cache change request, " + - "node is stopping."); - } - else if (ctx.clientDisconnected()) { - err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to execute dynamic cache change request, client node disconnected."); - } + err = checkNodeState(); } catch (IgniteCheckedException e) { err = e; @@ -2844,6 +2929,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Non null exception if node is stopping or disconnected. + */ + @Nullable private IgniteCheckedException checkNodeState() { + if (ctx.isStopping()) { + return new IgniteCheckedException("Failed to execute dynamic cache change request, " + + "node is stopping."); + } + else if (ctx.clientDisconnected()) { + return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to execute dynamic cache change request, client node disconnected."); + } + + return null; + } + + /** * @param type Event type. * @param node Event node. * @param topVer Topology version. @@ -2859,9 +2960,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param msg Customer message. * @param topVer Current topology version. + * @param node Node sent message. * @return {@code True} if minor topology version should be increased. */ - public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) { + public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer, ClusterNode node) { if (msg instanceof SchemaAbstractDiscoveryMessage) { ctx.query().onDiscovery((SchemaAbstractDiscoveryMessage)msg); @@ -2878,6 +2980,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (msg instanceof DynamicCacheChangeBatch) return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); + if (msg instanceof ClientCacheChangeDiscoveryMessage) + cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node); + return false; } @@ -3156,13 +3261,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting public cache for name: " + cacheName); - IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName); - DynamicCacheDescriptor desc = cacheDescriptor(cacheName); if (desc != null && !desc.cacheType().userCache()) throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); + IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName); + if (cache == null) { dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get(); @@ -3731,33 +3836,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> { - /** Cache name. */ - private String cacheName; - - /** Change request. */ - @GridToStringInclude - private DynamicCacheChangeRequest req; - - /** - * @param cacheName Cache name. - * @param req Cache start request. - */ - private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) { - this.cacheName = cacheName; - this.req = req; - } + /** */ + private UUID id; /** - * @return Request. + * @param id Future ID. */ - public DynamicCacheChangeRequest request() { - return req; + private DynamicCacheStartFuture(UUID id) { + this.id = id; } /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) { // Make sure to remove future before completion. - pendingFuts.remove(req.requestId(), this); + pendingFuts.remove(id, this); return super.onDone(res, err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java new file mode 100644 index 0000000..3166159 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Topology future created for client cache start. + */ +public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter + implements GridDhtTopologyFuture { + /** */ + final AffinityTopologyVersion topVer; + + /** + * @param topVer Topology version. + */ + public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) { + assert topVer != null; + + this.topVer = topVer; + + onDone(topVer); + } + + /** + * @param topVer Topology version. + * @param e Error. + */ + public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer, IgniteCheckedException e) { + assert e != null; + assert topVer != null; + + this.topVer = topVer; + + onDone(e); + } + + /** + * @param grp Cache group. + * @param topNodes Topology nodes. + */ + public void validate(CacheGroupContext grp, Collection<ClusterNode> topNodes) { + grpValidRes = U.newHashMap(1); + + grpValidRes.put(grp.groupId(), validateCacheGroup(grp,topNodes)); + } + + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 619630f..2d9e4f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -83,7 +83,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { private Map<Integer, Set<UUID>> part2node = new HashMap<>(); /** */ - private GridDhtPartitionExchangeId lastExchangeId; + private AffinityTopologyVersion lastExchangeVer; /** */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; @@ -183,21 +183,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { U.writeLock(lock); try { - assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + ']'; + AffinityTopologyVersion exchTopVer = exchFut.topologyVersion(); + + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + + ", exchVer=" + exchTopVer + ']'; this.stopping = stopping; - topVer = exchId.topologyVersion(); - discoCache = exchFut.discoCache(); + topVer = exchTopVer; + this.discoCache = discoCache; updateSeq.setIfGreater(updSeq); @@ -560,19 +562,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public GridDhtPartitionMap update( + @Nullable AffinityTopologyVersion exchVer, GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap) { if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']'); lock.writeLock().lock(); try { - if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + lastExchangeVer + ", exchVer=" + exchVer + ']'); return null; } @@ -580,15 +583,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (node2part != null && node2part.compareTo(partMap) >= 0) { if (log.isDebugEnabled()) log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); return null; } updateSeq.incrementAndGet(); - if (exchId != null) - lastExchangeId = exchId; + if (exchVer != null) + lastExchangeVer = exchVer; if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { @@ -598,7 +601,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { // then we keep the newer value. if (newPart != null && newPart.updateSequence() < part.updateSequence()) { if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + + log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); partMap.put(part.nodeId(), part); @@ -694,16 +697,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { if (stopping) return null; - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" + + lastExchangeVer + ", exchId=" + exchId + ']'); return null; } if (exchId != null) - lastExchangeId = exchId; + lastExchangeVer = exchId.topologyVersion(); if (node2part == null) // Create invalid partition map. http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java index d9d642a..44c7b88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -32,6 +32,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { private static final long serialVersionUID = 0L; /** */ + private static final int SND_PART_STATE_MASK = 0x01; + + /** */ + private byte flags; + + /** */ private long futId; /** Topology version being queried. */ @@ -48,16 +54,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { * @param futId Future ID. * @param grpId Cache group ID. * @param topVer Topology version. + * @param sndPartMap {@code True} if need send in response cache partitions state. */ public GridDhtAffinityAssignmentRequest( long futId, int grpId, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer, + boolean sndPartMap) { assert topVer != null; this.futId = futId; this.grpId = grpId; this.topVer = topVer; + + if (sndPartMap) + flags |= SND_PART_STATE_MASK; + } + + /** + * @return {@code True} if need send in response cache partitions state. + */ + public boolean sendPartitionsState() { + return (flags & SND_PART_STATE_MASK) != 0; } /** @@ -91,7 +109,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 6; } /** {@inheritDoc} */ @@ -110,12 +128,18 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { switch (writer.state()) { case 3: - if (!writer.writeLong("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 4: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 5: if (!writer.writeMessage("topVer", topVer)) return false; @@ -138,7 +162,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { switch (reader.state()) { case 3: - futId = reader.readLong("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -146,6 +170,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage { reader.incrementState(); case 4: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java index 4df3fc1..5b0de08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -21,19 +21,20 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.UUID; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Affinity assignment response. @@ -62,6 +63,13 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { /** Affinity assignment bytes. */ private byte[] idealAffAssignmentBytes; + /** */ + @GridDirectTransient + private GridDhtPartitionFullMap partMap; + + /** */ + private byte[] partBytes; + /** * Empty constructor. */ @@ -107,30 +115,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @return Affinity assignment. */ - public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) { + public List<List<ClusterNode>> affinityAssignment(DiscoCache discoCache) { if (affAssignmentIds != null) - return nodes(disco, affAssignmentIds); + return nodes(discoCache, affAssignmentIds); return null; } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @return Ideal affinity assignment. */ - public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) { - return nodes(disco, idealAffAssignment); + public List<List<ClusterNode>> idealAffinityAssignment(DiscoCache discoCache) { + return nodes(discoCache, idealAffAssignment); } /** - * @param disco Discovery manager. + * @param discoCache Discovery data cache. * @param assignmentIds Assignment node IDs. * @return Assignment nodes. */ - private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) { + private List<List<ClusterNode>> nodes(DiscoCache discoCache, List<List<UUID>> assignmentIds) { if (assignmentIds != null) { List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size()); @@ -139,7 +147,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { List<ClusterNode> nodes = new ArrayList<>(ids.size()); for (int j = 0; j < ids.size(); j++) { - ClusterNode node = disco.node(topVer, ids.get(j)); + ClusterNode node = discoCache.node(ids.get(j)); assert node != null; @@ -163,6 +171,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { } /** + * @param partMap Partition map. + */ + public void partitionMap(GridDhtPartitionFullMap partMap) { + this.partMap = partMap; + } + + /** + * @return Partition map. + */ + @Nullable public GridDhtPartitionFullMap partitionMap() { + return partMap; + } + + /** * @param assignments Assignment. * @return Assignment where cluster nodes are converted to their ids. */ @@ -193,7 +215,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** @@ -208,6 +230,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { if (idealAffAssignment != null && idealAffAssignmentBytes == null) idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment); + + if (partMap != null && partBytes == null) + partBytes = U.zip(U.marshal(ctx.marshaller(), partMap)); } /** {@inheritDoc} */ @@ -222,6 +247,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { if (idealAffAssignmentBytes != null && idealAffAssignment == null) idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr); + + if (partBytes != null && partMap == null) + partMap = U.unmarshalZip(ctx.marshaller(), partBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } /** {@inheritDoc} */ @@ -263,6 +291,12 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { writer.incrementState(); case 6: + if (!writer.writeByteArray("partBytes", partBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeMessage("topVer", topVer)) return false; @@ -309,6 +343,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage { reader.incrementState(); case 6: + partBytes = reader.readByteArray("partBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 8746320..dcc08d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -27,13 +27,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -76,25 +75,28 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin /** */ private final int grpId; + /** */ + private boolean needPartState; + /** * @param ctx Context. - * @param grpDesc Group descriptor. + * @param grpId Group ID. * @param topVer Topology version. * @param discoCache Discovery cache. */ public GridDhtAssignmentFetchFuture( GridCacheSharedContext ctx, - CacheGroupDescriptor grpDesc, + int grpId, AffinityTopologyVersion topVer, DiscoCache discoCache ) { this.topVer = topVer; - this.grpId = grpDesc.groupId(); + this.grpId = grpId; this.ctx = ctx; id = idGen.getAndIncrement(); - Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId()); + Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId); LinkedList<ClusterNode> tmp = new LinkedList<>(); @@ -127,8 +129,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin /** * Initializes fetch future. + * + * @param needPartState {@code True} if also need fetch partitions state. */ - public void init() { + public void init(boolean needPartState) { + this.needPartState = needPartState; + ctx.affinity().addDhtAssignmentFetchFuture(this); requestFromNextNode(); @@ -195,7 +201,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin ", node=" + node + ']'); ctx.io().send(node, - new GridDhtAffinityAssignmentRequest(id, grpId, topVer), + new GridDhtAffinityAssignmentRequest(id, grpId, topVer, needPartState), AFFINITY_POOL); // Close window for listener notification. http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 05831b1..ed80f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -407,14 +407,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return null; } - // If remote node has no near cache, don't add it. - if (!cctx.discovery().cacheNearNode(node, cacheName())) { - if (log.isDebugEnabled()) - log.debug("Ignoring near reader because near cache is disabled: " + nodeId); - - return null; - } - // If remote node is (primary?) or back up, don't add it as a reader. if (cctx.affinity().partitionBelongs(node, partition(), topVer)) { if (log.isDebugEnabled()) @@ -653,7 +645,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { for (int i = 0; i < rdrs.length; i++) { ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId()); - if (node == null || !cctx.discovery().cacheNode(node, cacheName())) { + if (node == null) { // Node has left and if new list has already been created, just skip. // Otherwise, create new list and add alive nodes. if (newRdrs == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d365a8e..acb822c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; @@ -54,15 +55,15 @@ public interface GridDhtPartitionTopology { /** * Updates topology version. * - * @param exchId Exchange ID. * @param exchFut Exchange future. + * @param discoCache Discovery data cache. * @param updateSeq Update sequence. * @param stopping Stopping flag. * @throws IgniteInterruptedCheckedException If interrupted. */ public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updateSeq, boolean stopping ) throws IgniteInterruptedCheckedException; @@ -221,12 +222,13 @@ public interface GridDhtPartitionTopology { public void onRemoved(GridDhtCacheEntry e); /** - * @param exchId Exchange ID. + * @param exchangeVer Exchange version. * @param partMap Update partition map. * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ - public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + public GridDhtPartitionMap update( + @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, T2<Long, Long>> cntrMap); @@ -250,7 +252,7 @@ public interface GridDhtPartitionTopology { * This method should be called on topology coordinator after all partition messages are received. * * @param discoEvt Discovery event for which we detect lost partitions. - * @return {@code True} if partitons state got updated. + * @return {@code True} if partitions state got updated. */ public boolean detectLostPartitions(DiscoveryEvent discoEvt); http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 89554f3..ea731f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -105,7 +105,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private Map<Integer, Set<UUID>> part2node = new HashMap<>(); /** */ - private GridDhtPartitionExchangeId lastExchangeId; + private AffinityTopologyVersion lastExchangeVer; /** */ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; @@ -167,7 +167,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { part2node = new HashMap<>(); - lastExchangeId = null; + lastExchangeVer = null; updateSeq.set(1); @@ -301,16 +301,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void updateTopologyVersion( - GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut, + GridDhtTopologyFuture exchFut, + DiscoCache discoCache, long updSeq, boolean stopping ) throws IgniteInterruptedCheckedException { U.writeLock(lock); try { - assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + + AffinityTopologyVersion exchTopVer = exchFut.topologyVersion(); + + assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer + + ", exchTopVer=" + exchTopVer + ", fut=" + exchFut + ']'; this.stopping = stopping; @@ -321,9 +323,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = AffinityTopologyVersion.NONE; - topVer = exchId.topologyVersion(); + topVer = exchTopVer; - discoCache = exchFut.discoCache(); + this.discoCache = discoCache; } finally { lock.writeLock().unlock(); @@ -1109,12 +1111,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Override public GridDhtPartitionMap update( - @Nullable GridDhtPartitionExchangeId exchId, + @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable Map<Integer, T2<Long, Long>> cntrMap ) { if (log.isDebugEnabled()) - log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); assert partMap != null; @@ -1147,27 +1149,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - //if need skip - if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ']'); return null; } if (node2part != null && node2part.compareTo(partMap) >= 0) { if (log.isDebugEnabled()) - log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); return null; } long updateSeq = this.updateSeq.incrementAndGet(); - if (exchId != null) - lastExchangeId = exchId; + if (exchangeVer != null) + lastExchangeVer = exchangeVer; if (node2part != null) { for (GridDhtPartitionMap part : node2part.values()) { @@ -1180,8 +1181,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + - mapString(part) + ", newPart=" + mapString(newPart) + ']'); + log.debug("Overriding partition map in full update map [exch=" + exchangeVer + + ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); partMap.put(part.nodeId(), part); } @@ -1333,16 +1334,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) { if (log.isDebugEnabled()) - log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + - lastExchangeId + ", exchId=" + exchId + ']'); + log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" + + lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']'); return null; } if (exchId != null) - lastExchangeId = exchId; + lastExchangeVer = exchId.topologyVersion(); if (node2part == null) // Create invalid partition map. http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java new file mode 100644 index 0000000..e70f383 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; + +/** + * + */ +public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion> + implements GridDhtTopologyFuture { + /** Cache groups validation results. */ + protected volatile Map<Integer, CacheValidation> grpValidRes; + + /** + * @param grp Cache group. + * @param topNodes Topology nodes. + * @return Validation result. + */ + protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) { + Collection<Integer> lostParts = grp.isLocal() ? + Collections.<Integer>emptyList() : grp.topology().lostPartitions(); + + boolean valid = true; + + if (!grp.systemCache()) { + TopologyValidator validator = grp.topologyValidator(); + + if (validator != null) + valid = validator.validate(topNodes); + } + + return new CacheValidation(valid, lostParts); + } + + /** {@inheritDoc} */ + @Nullable @Override public final Throwable validateCache( + GridCacheContext cctx, + boolean recovery, + boolean read, + @Nullable Object key, + @Nullable Collection<?> keys + ) { + assert isDone() : this; + + Throwable err = error(); + + if (err != null) + return err; + + if (!cctx.shared().kernalContext().state().active()) + return new CacheInvalidStateException( + "Failed to perform cache operation (cluster is not activated): " + cctx.name()); + + CacheGroupContext grp = cctx.group(); + + PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy(); + + if (grp.needsRecovery() && !recovery) { + if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL)) + return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " + + cctx.name()); + } + + if (grp.needsRecovery() || grp.topologyValidator() != null) { + CacheValidation validation = grpValidRes.get(grp.groupId()); + + if (validation == null) + return null; + + if (!validation.valid && !read) + return new IgniteCheckedException("Failed to perform cache operation " + + "(cache topology is not valid): " + cctx.name()); + + if (recovery || !grp.needsRecovery()) + return null; + + if (key != null) { + int p = cctx.affinity().partition(key); + + CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p, + validation.lostParts, partLossPlc); + + if (ex != null) + return ex; + } + + if (keys != null) { + for (Object k : keys) { + int p = cctx.affinity().partition(k); + + CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p, + validation.lostParts, partLossPlc); + + if (ex != null) + return ex; + } + } + } + + return null; + } + + /** + * @param cacheName Cache name. + * @param read Read flag. + * @param key Key to check. + * @param part Partition this key belongs to. + * @param lostParts Collection of lost partitions. + * @param plc Partition loss policy. + * @return Invalid state exception if this operation is disallowed. + */ + private CacheInvalidStateException validatePartitionOperation( + String cacheName, + boolean read, + Object key, + int part, + Collection<Integer> lostParts, + PartitionLossPolicy plc + ) { + if (lostParts.contains(part)) { + if (!read) { + assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE; + + if (plc == READ_WRITE_SAFE) { + return new CacheInvalidStateException("Failed to execute cache operation " + + "(all partition owners have left the grid, partition data has been lost) [" + + "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); + } + } + else { + // Read. + if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) + return new CacheInvalidStateException("Failed to execute cache operation " + + "(all partition owners have left the grid, partition data has been lost) [" + + "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']'); + } + } + + return null; + } + + /** + * Cache validation result. + */ + protected static class CacheValidation { + /** Topology validation result. */ + private boolean valid; + + /** Lost partitions on this topology version. */ + private Collection<Integer> lostParts; + + /** + * @param valid Valid flag. + * @param lostParts Lost partitions. + */ + private CacheValidation(boolean valid, Collection<Integer> lostParts) { + this.valid = valid; + this.lostParts = lostParts; + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index c7c4280..631fe10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -97,6 +99,10 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Preload keys. */ private BitSet preloadKeys; + /** */ + @GridDirectTransient + private List<IgniteTxKey> nearWritesCacheMissed; + /** * Empty constructor required for {@link Externalizable}. */ @@ -163,6 +169,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } /** + * @return Near cache writes for which cache was not found (possible if client near cache was closed). + */ + @Nullable public List<IgniteTxKey> nearWritesCacheMissed() { + return nearWritesCacheMissed; + } + + /** * @return Near transaction ID. */ public GridCacheVersion nearXidVersion() { @@ -319,13 +332,37 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { while (keyIter.hasNext()) { IgniteTxKey key = keyIter.next(); - key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(key.cacheId()); + + if (cacheCtx != null) { + key.finishUnmarshal(cacheCtx, ldr); - owned.put(key, valIter.next()); + owned.put(key, valIter.next()); + } } } - unmarshalTx(nearWrites, true, ctx, ldr); + if (nearWrites != null) { + for (Iterator<IgniteTxEntry> it = nearWrites.iterator(); it.hasNext();) { + IgniteTxEntry e = it.next(); + + GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(e.cacheId()); + + if (cacheCtx == null) { + it.remove(); + + if (nearWritesCacheMissed == null) + nearWritesCacheMissed = new ArrayList<>(); + + nearWritesCacheMissed.add(e.txKey()); + } + else { + e.context(cacheCtx); + + e.unmarshal(ctx, true, ldr); + } + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 5dcb98c..0c2bf81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -124,7 +124,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { /** * @return Evicted readers. */ - Collection<IgniteTxKey> nearEvicted() { + public Collection<IgniteTxKey> nearEvicted() { return nearEvicted; } @@ -194,7 +194,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { for (IgniteTxKey key : nearEvicted) { GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - key.prepareMarshal(cctx); + // Can be null if client near cache was removed, in this case assume do not need prepareMarshal. + if (cctx != null) + key.prepareMarshal(cctx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 67e3ebc..52f007a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -3240,9 +3240,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridDhtAtomicUpdateResponse dhtRes = null; - if (isNearEnabled(cacheCfg)) { - List<KeyCacheObject> nearEvicted = - ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); + if (req.nearSize() > 0) { + List<KeyCacheObject> nearEvicted; + + if (isNearEnabled(ctx)) + nearEvicted = ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); + else { + nearEvicted = new ArrayList<>(req.nearSize()); + + for (int i = 0; i < req.nearSize(); i++) + nearEvicted.add(req.nearKey(i)); + } if (nearEvicted != null) { dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 7b2547a..70bf6f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -118,7 +118,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements G /** * @param nearEvicted Evicted near cache keys. */ - void nearEvicted(List<KeyCacheObject> nearEvicted) { + public void nearEvicted(List<KeyCacheObject> nearEvicted) { this.nearEvicted = nearEvicted; } @@ -133,10 +133,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements G GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(nearEvicted, cctx); + // Can be null if client near cache was removed, in this case assume do not need prepareMarshal. + if (cctx != null) { + prepareMarshalCacheObjects(nearEvicted, cctx); - if (errs != null) - errs.prepareMarshal(this, cctx); + if (errs != null) + errs.prepareMarshal(this, cctx); + } } /** {@inheritDoc} */
