Repository: ignite Updated Branches: refs/heads/ignite-zk a5a547073 -> d8c52dc54
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8c52dc5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8c52dc5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8c52dc5 Branch: refs/heads/ignite-zk Commit: d8c52dc54a157cfb68c1948ca4ef9026c18f6133 Parents: a5a5470 Author: sboikov <[email protected]> Authored: Wed Nov 29 14:51:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Nov 29 15:21:15 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 + .../ContinuousRoutineStartResultMessage.java | 197 ++++++ .../continuous/GridContinuousProcessor.java | 664 ++++++++++++++----- .../IgniteCacheEntryListenerAtomicTest.java | 2 +- 4 files changed, 700 insertions(+), 169 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 2f8ba6d..e453889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -118,6 +118,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersioned import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; +import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -881,6 +882,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 130: + msg = new ContinuousRoutineStartResultMessage(); + + break; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java new file mode 100644 index 0000000..0d5eb48 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ContinuousRoutineStartResultMessage implements Message { + /** */ + private static final int ERROR_FLAG = 0x01; + + /** */ + private UUID routineId; + + /** */ + private byte[] errBytes; + + /** */ + private byte[] cntrsMapBytes; + + /** */ + private int flags; + + /** + * + */ + public ContinuousRoutineStartResultMessage() { + // No-op. + } + + /** + * @param routineId Routine ID. + * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param errBytes Error bytes. + * @param err {@code True} if failed to start routine. + */ + ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, byte[] errBytes, boolean err) { + this.routineId = routineId; + this.cntrsMapBytes = cntrsMapBytes; + this.errBytes = errBytes; + + if (err) + flags |= ERROR_FLAG; + } + + /** + * @return Marshalled {@link CachePartitionPartialCountersMap}. + */ + @Nullable byte[] countersMapBytes() { + return cntrsMapBytes; + } + + /** + * @return {@code True} if failed to start routine. + */ + boolean error() { + return (flags & ERROR_FLAG) != 0; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Error bytes. + */ + @Nullable byte[] errorBytes() { + return errBytes; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("cntrsMapBytes", cntrsMapBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("flags", flags)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntrsMapBytes = reader.readByteArray("cntrsMapBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + flags = reader.readInt("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(ContinuousRoutineStartResultMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 130; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 4194622..e888b37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -207,7 +209,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (ctx.isStopping()) return; - processStartRequestV2(snd, msg); + processStartRequestV2(topVer, snd, msg); } }); @@ -252,32 +254,36 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object obj, byte plc) { - GridContinuousMessage msg = (GridContinuousMessage)obj; + if (obj instanceof ContinuousRoutineStartResultMessage) + processRoutineStartResultMessage(nodeId, (ContinuousRoutineStartResultMessage)obj); + else { + GridContinuousMessage msg = (GridContinuousMessage)obj; - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); + if (msg.data() == null && msg.dataBytes() != null) { + try { + msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process message (ignoring): " + msg, e); - return; + return; + } } - } - switch (msg.type()) { - case MSG_EVT_NOTIFICATION: - processNotification(nodeId, msg); + switch (msg.type()) { + case MSG_EVT_NOTIFICATION: + processNotification(nodeId, msg); - break; + break; - case MSG_EVT_ACK: - processMessageAck(msg); + case MSG_EVT_ACK: + processMessageAck(msg); - break; + break; - default: - assert false : "Unexpected message received: " + msg.type(); + default: + assert false : "Unexpected message received: " + msg.type(); + } } } }); @@ -433,6 +439,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return data; } + return null; } @@ -483,7 +490,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { routinesInfo.addRoutineInfo(routineInfo); - startRoutine(routineInfo); + startDiscoveryDataRoutine(routineInfo); } } } @@ -506,7 +513,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { routinesInfo.addRoutineInfo(routineInfo); - startRoutine(routineInfo); + startDiscoveryDataRoutine(routineInfo); } } } @@ -523,7 +530,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param routineInfo Routine info. */ - private void startRoutine(ContinuousRoutineInfo routineInfo) { + private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) { IgnitePredicate<ClusterNode> nodeFilter = null; try { @@ -796,30 +803,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Whether local node is included in routine. boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); - StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe); + AbstractContinuousMessage msg; try { - if (ctx.config().isPeerClassLoadingEnabled()) { - // Handle peer deployment for projection predicate. - if (prjPred != null && !U.isGrid(prjPred.getClass())) { - Class cls = U.detectClass(prjPred); - - String clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred); - - reqData.className(clsName); - reqData.deploymentInfo(new GridDeploymentInfoBean(dep)); - - reqData.p2pMarshal(marsh); - } - - // Handle peer deployment for other handler-specific objects. - reqData.handler().p2pMarshal(ctx); - } + msg = createStartMessage(routineId, hnd, bufSize, interval, autoUnsubscribe, prjPred); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -828,16 +815,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Register per-routine notifications listener if ordered messaging is used. registerMessageListener(hnd); - StartFuture fut = new StartFuture(ctx, routineId); + StartFuture fut = new StartFuture(routineId); startFuts.put(routineId, fut); try { - if (locIncluded || hnd.isQuery()) - registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); + if (locIncluded || hnd.isQuery()) { + registerHandler(ctx.localNodeId(), + routineId, + hnd, + bufSize, + interval, + autoUnsubscribe, + true); + } - ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, - reqData.handler().keepBinary())); + ctx.discovery().sendCustomEvent(msg); } catch (IgniteCheckedException e) { startFuts.remove(routineId); @@ -857,6 +850,92 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param routineId Routine ID. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param nodeFilter Node filter. + * @return Routine start message. + * @throws IgniteCheckedException If failed. + */ + private AbstractContinuousMessage createStartMessage(UUID routineId, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgnitePredicate<ClusterNode> nodeFilter) + throws IgniteCheckedException + { + hnd = hnd.clone(); + + String clsName = null; + GridDeploymentInfoBean dep = null; + + if (ctx.config().isPeerClassLoadingEnabled()) { + // Handle peer deployment for projection predicate. + if (nodeFilter != null && !U.isGrid(nodeFilter.getClass())) { + Class cls = U.detectClass(nodeFilter); + + clsName = cls.getName(); + + GridDeployment dep0 = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep0 == null) + throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + nodeFilter); + + dep = new GridDeploymentInfoBean(dep0); + } + + // Handle peer deployment for other handler-specific objects. + hnd.p2pMarshal(ctx); + } + + if (discoProtoVer == 1) { + StartRequestData reqData = new StartRequestData( + nodeFilter, + hnd, + bufSize, + interval, + autoUnsubscribe); + + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); + + reqData.p2pMarshal(marsh); + } + + return new StartRoutineDiscoveryMessage( + routineId, + reqData, + reqData.handler().keepBinary()); + } + else { + assert discoProtoVer == 2 : discoProtoVer; + + byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; + byte[] hndBytes = U.marshal(marsh, hnd); + + StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, + hndBytes, + bufSize, + interval, + autoUnsubscribe); + + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); + } + + return new StartRoutineDiscoveryMessageV2( + routineId, + reqData, + hnd.keepBinary()); + } + } + + /** * @param hnd Handler. */ private void registerMessageListener(GridContinuousHandler hnd) { @@ -1138,35 +1217,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = startFuts.remove(msg.routineId()); if (fut != null) { - if (msg.errs().isEmpty()) { - LocalRoutineInfo routine = locInfos.get(msg.routineId()); - - // Update partition counters. - if (routine != null && routine.handler().isQuery()) { - Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode(); - Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters(); - - GridCacheAdapter<Object, Object> interCache = - ctx.cache().internalCache(routine.handler().cacheName()); - - GridCacheContext cctx = interCache != null ? interCache.context() : null; - - if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), - toCountersMap(cctx.topology().localUpdateCounters(false))); - - routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); - } - - fut.onRemoteRegistered(); - } - else { - IgniteCheckedException firstEx = F.first(msg.errs().values()); - - fut.onDone(firstEx); - - stopRoutine(msg.routineId()); - } + fut.onAllRemoteRegistered( + topVer, + msg.errs(), + msg.updateCountersPerNode(), + msg.updateCounters()); } } @@ -1280,96 +1335,201 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param sndId Sender node ID. + * @param msg Message. + */ + private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStartResultMessage msg) { + StartFuture fut = startFuts.get(msg.routineId()); + + if (fut != null) + fut.onResult(sndId, msg); + } + + /** + * @param topVer Current topology version. * @param snd Sender. * @param msg Start request. */ - private void processStartRequestV2(ClusterNode snd, StartRoutineDiscoveryMessageV2 msg) { - StartRequestDataV2 reqData = msg.startRequestData(); + private void processStartRequestV2(final AffinityTopologyVersion topVer, + final ClusterNode snd, + final StartRoutineDiscoveryMessageV2 msg) { + try { + StartRequestDataV2 reqData = msg.startRequestData(); - ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), - msg.routineId(), - reqData.handlerBytes(), - reqData.nodeFilterBytes(), - reqData.bufferSize(), - reqData.interval(), - reqData.autoUnsubscribe()); + ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), + msg.routineId(), + reqData.handlerBytes(), + reqData.nodeFilterBytes(), + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe()); - routinesInfo.addRoutineInfo(routineInfo); + routinesInfo.addRoutineInfo(routineInfo); - Exception err = null; + final Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); - IgnitePredicate<ClusterNode> nodeFilter = null; + // Should not use marshaller and send messages from discovery thread. + ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(new Runnable() { + @Override public void run() { + if (snd.id().equals(ctx.localNodeId())) { + StartFuture fut = startFuts.get(msg.routineId()); - if (reqData.nodeFilterBytes() != null) { - try { - if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { - String clsName = reqData.className(); - GridDeploymentInfo depInfo = reqData.deploymentInfo(); - - GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), - clsName, - clsName, - depInfo.userVersion(), - snd.id(), - depInfo.classLoaderId(), - depInfo.participants(), - null); + if (fut != null) + fut.initRemoteNodes(topVer, nodes); - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + return; + } - nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(dep.classLoader(), ctx.config())); - } - else - nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(ctx.config())); + StartRequestDataV2 reqData = msg.startRequestData(); - if (nodeFilter != null) - ctx.resource().injectGeneric(nodeFilter); - } - catch (Exception e) { - err = e; + Exception err = null; - U.error(log, "Failed to unmarshal continuous routine filter [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', e); - } - } + IgnitePredicate<ClusterNode> nodeFilter = null; - boolean register = err == null && (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); + byte[] cntrs = null; - if (register) { - try { - GridContinuousHandler hnd = U.unmarshal(marsh, reqData.handlerBytes(), U.resolveClassLoader(ctx.config())); + if (reqData.nodeFilterBytes() != null) { + try { + if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { + String clsName = reqData.className(); + GridDeploymentInfo depInfo = reqData.deploymentInfo(); + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), + clsName, + clsName, + depInfo.userVersion(), + snd.id(), + depInfo.classLoaderId(), + depInfo.participants(), + null); + + if (dep == null) { + throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + + "for class: " + clsName); + } + + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + else { + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(ctx.config())); + } + + if (nodeFilter != null) + ctx.resource().injectGeneric(nodeFilter); + } + catch (Exception e) { + err = e; - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(snd.id(), ctx); + U.error(log, "Failed to unmarshal continuous routine filter [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } + } - if (msg.keepBinary()) { - assert hnd instanceof CacheContinuousQueryHandler : hnd; + boolean register = err == null && + (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); + + if (register) { + try { + GridContinuousHandler hnd = U.unmarshal(marsh, + reqData.handlerBytes(), + U.resolveClassLoader(ctx.config())); - ((CacheContinuousQueryHandler)hnd).keepBinary(true); + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(snd.id(), ctx); + + if (msg.keepBinary()) { + assert hnd instanceof CacheContinuousQueryHandler : hnd; + + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } + + GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? + new GridMessageListenHandler((GridMessageListenHandler)hnd) : + hnd; + + registerHandler(snd.id(), + msg.routineId, + hnd0, + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe(), + false); + + if (hnd0.isQuery()) { + GridCacheProcessor proc = ctx.cache(); + + if (proc != null) { + GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); + + if (cache != null && !cache.isLocal() && cache.context().userCache()) { + CachePartitionPartialCountersMap cntrsMap = + cache.context().topology().localUpdateCounters(false); + + cntrs = U.marshal(marsh, cntrsMap); + } + } + } + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to register continuous routine handler [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } + } + + sendMessageStartResult(snd, msg.routineId(), cntrs, err); } + }); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to submit continuous routine started result closure: " + e, e); + } + } - GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? - new GridMessageListenHandler((GridMessageListenHandler)hnd) : - hnd; + /** + * @param node Target node. + * @param routineId Routine ID. + * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param err Start error if any. + */ + private void sendMessageStartResult(final ClusterNode node, + final UUID routineId, + byte[] cntrsMapBytes, + final @Nullable Exception err) + { + byte[] errBytes = null; - registerHandler(snd.id(), - msg.routineId, - hnd0, - reqData.bufferSize(), - reqData.interval(), - reqData.autoUnsubscribe(), - false); + if (err != null) { + try { + errBytes = U.marshal(marsh, err); } catch (Exception e) { - err = e; - - U.error(log, "Failed to register continuous routine handler [" + - "routineId=" + routineInfo.routineId + - ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + U.error(log, "Failed to marshal routine start error: " + e, e); } } + + ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId, + cntrsMapBytes, + errBytes, + err != null); + + try { + ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, null); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send routine start result, node failed: " + e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send routine start result: " + e, e); + } } /** @@ -1690,9 +1850,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - if (discoProtoVer == 2) + if (discoProtoVer == 2) { routinesInfo.onNodeFail(nodeId); + for (StartFuture fut : startFuts.values()) + fut.onNodeFail(nodeId); + } + clientInfos.remove(nodeId); // Unregister handlers created by left node. @@ -2132,10 +2296,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * Future for start routine. */ - private static class StartFuture extends GridFutureAdapter<UUID> { - /** */ - private GridKernalContext ctx; - + private class StartFuture extends GridFutureAdapter<UUID> { /** Consume ID. */ private UUID routineId; @@ -2145,23 +2306,123 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** All remote listeners are registered. */ private volatile boolean rmt; - /** Timeout object. */ - private volatile GridTimeoutObject timeoutObj; + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private int expRes; + + /** */ + private final Map<UUID, ContinuousRoutineStartResultMessage> res = new HashMap<>(); /** - * @param ctx Kernal context. * @param routineId Consume ID. */ - StartFuture(GridKernalContext ctx, UUID routineId) { - this.ctx = ctx; - + StartFuture(UUID routineId) { this.routineId = routineId; } /** + * @param topVer Topology version. + * @param errs Errors. + * @param cntrsPerNode Update counters. + * @param cntrs Update counters. + */ + private void onAllRemoteRegistered( + AffinityTopologyVersion topVer, + @Nullable Map<UUID, ? extends Exception> errs, + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode, + Map<Integer, T2<Long, Long>> cntrs) { + try { + if (errs == null || errs.isEmpty()) { + LocalRoutineInfo routine = locInfos.get(routineId); + + // Update partition counters. + if (routine != null && routine.handler().isQuery()) { + GridCacheAdapter<Object, Object> interCache = + ctx.cache().internalCache(routine.handler().cacheName()); + + GridCacheContext cctx = interCache != null ? interCache.context() : null; + + if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) + cntrsPerNode.put(ctx.localNodeId(), + toCountersMap(cctx.topology().localUpdateCounters(false))); + + routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); + } + + onRemoteRegistered(); + } + else { + Exception firstEx = F.first(errs.values()); + + onDone(firstEx); + + stopRoutine(routineId); + } + } + finally { + startFuts.remove(routineId, this); + } + } + + /** + * @param topVer Topology version. + * @param nodes Nodes. + */ + void initRemoteNodes(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes) { + RoutineRegisterResults res0 = null; + + synchronized (res) { + assert this.topVer == null && expRes == 0; + + this.topVer = topVer; + + for (ClusterNode node : nodes) { + if (!ctx.localNodeId().equals(node.id()) && ctx.discovery().alive(node.id())) + expRes++; + } + + if (expRes == res.size()) + res0 = createRegisterResults(); + } + + if (res0 != null) + onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) { + RoutineRegisterResults res0 = null; + + synchronized (res) { + if (res.containsKey(nodeId) || (topVer != null && res.size() == expRes)) + return; + + res.put(nodeId, msg); + + if (topVer != null && expRes == res.size()) + res0 = createRegisterResults(); + } + + if (res0 != null) + onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + } + + /** + * @param nodeId Failed node ID. + */ + void onNodeFail(UUID nodeId) { + onResult(nodeId, new ContinuousRoutineStartResultMessage(routineId, null, null, false)); + } + + /** * Called when local listener is registered. */ - public void onLocalRegistered() { + void onLocalRegistered() { loc = true; if (rmt && !isDone()) @@ -2171,7 +2432,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * Called when all remote listeners are registered. */ - public void onRemoteRegistered() { + void onRemoteRegistered() { rmt = true; if (loc && !isDone()) @@ -2179,22 +2440,62 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param timeoutObj Timeout object. + * @return Results. */ - public void addTimeoutObject(GridTimeoutObject timeoutObj) { - assert timeoutObj != null; + private RoutineRegisterResults createRegisterResults() { + Map<UUID, Exception> errs = null; + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null; - this.timeoutObj = timeoutObj; + for (Map.Entry<UUID, ContinuousRoutineStartResultMessage> entry : res.entrySet()) { + ContinuousRoutineStartResultMessage msg = entry.getValue(); - ctx.timeout().addTimeoutObject(timeoutObj); - } + if (msg.error()) { + byte[] errBytes = msg.errorBytes(); - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable UUID res, @Nullable Throwable err) { - if (timeoutObj != null) - ctx.timeout().removeTimeoutObject(timeoutObj); + Exception err = null; - return super.onDone(res, err); + if (errBytes != null) { + try { + err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous routine start error: " + e); + } + } + + if (err == null) { + err = new IgniteCheckedException("Failed to start continuous " + + "routine on node: " + entry.getKey()); + } + + if (errs == null) + errs = new HashMap<>(); + + errs.put(entry.getKey(), err); + } + else { + byte[] cntrsMapBytes = msg.countersMapBytes(); + + if (cntrsMapBytes != null) { + try { + CachePartitionPartialCountersMap cntrsMap = U.unmarshal( + marsh, + cntrsMapBytes, + U.resolveClassLoader(ctx.config())); + + if (cntrsPerNode == null) + cntrsPerNode = new HashMap<>(); + + cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous query update counters: " + e); + } + } + } + } + + return new RoutineRegisterResults(topVer, errs, cntrsPerNode); } /** {@inheritDoc} */ @@ -2204,6 +2505,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * + */ + private static class RoutineRegisterResults { + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final Map<UUID, ? extends Exception> errs; + + /** */ + private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode; + + /** + * @param topVer Topology version. + * @param errs Errors. + * @param cntrsPerNode Update counters. + */ + RoutineRegisterResults(AffinityTopologyVersion topVer, + Map<UUID, ? extends Exception> errs, + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) { + this.topVer = topVer; + this.errs = errs; + this.cntrsPerNode = cntrsPerNode; + } + } + + /** * Future for stop routine. */ private static class StopFuture extends GridFutureAdapter<Object> { http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java index d7d97a4..cddb446 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java @@ -30,7 +30,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; public class IgniteCacheEntryListenerAtomicTest extends IgniteCacheEntryListenerAbstractTest { /** {@inheritDoc} */ @Override protected int gridCount() { - return 1; + return 3; } /** {@inheritDoc} */
