Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a5a547073 -> d8c52dc54


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8c52dc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8c52dc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8c52dc5

Branch: refs/heads/ignite-zk
Commit: d8c52dc54a157cfb68c1948ca4ef9026c18f6133
Parents: a5a5470
Author: sboikov <[email protected]>
Authored: Wed Nov 29 14:51:31 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Nov 29 15:21:15 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../ContinuousRoutineStartResultMessage.java    | 197 ++++++
 .../continuous/GridContinuousProcessor.java     | 664 ++++++++++++++-----
 .../IgniteCacheEntryListenerAtomicTest.java     |   2 +-
 4 files changed, 700 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2f8ba6d..e453889 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -118,6 +118,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersioned
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import 
org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import 
org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -881,6 +882,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 130:
+                msg = new ContinuousRoutineStartResultMessage();
+
+                break;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
new file mode 100644
index 0000000..0d5eb48
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ContinuousRoutineStartResultMessage implements Message {
+    /** */
+    private static final int ERROR_FLAG = 0x01;
+
+    /** */
+    private UUID routineId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private byte[] cntrsMapBytes;
+
+    /** */
+    private int flags;
+
+    /**
+     *
+     */
+    public ContinuousRoutineStartResultMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param routineId Routine ID.
+     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
+     * @param errBytes Error bytes.
+     * @param err {@code True} if failed to start routine.
+     */
+    ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, 
byte[] errBytes, boolean err) {
+        this.routineId = routineId;
+        this.cntrsMapBytes = cntrsMapBytes;
+        this.errBytes = errBytes;
+
+        if (err)
+            flags |= ERROR_FLAG;
+    }
+
+    /**
+     * @return Marshalled {@link CachePartitionPartialCountersMap}.
+     */
+    @Nullable byte[] countersMapBytes() {
+        return cntrsMapBytes;
+    }
+
+    /**
+     * @return {@code True} if failed to start routine.
+     */
+    boolean error() {
+        return (flags & ERROR_FLAG) != 0;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    @Nullable byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("cntrsMapBytes", cntrsMapBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntrsMapBytes = reader.readByteArray("cntrsMapBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                flags = reader.readInt("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return 
reader.afterMessageRead(ContinuousRoutineStartResultMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 130;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 4194622..e888b37 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
@@ -61,6 +62,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -207,7 +209,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     if (ctx.isStopping())
                         return;
 
-                    processStartRequestV2(snd, msg);
+                    processStartRequestV2(topVer, snd, msg);
                 }
             });
 
@@ -252,32 +254,36 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new 
GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object obj, byte plc) 
{
-                GridContinuousMessage msg = (GridContinuousMessage)obj;
+                if (obj instanceof ContinuousRoutineStartResultMessage)
+                    processRoutineStartResultMessage(nodeId, 
(ContinuousRoutineStartResultMessage)obj);
+                else {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
 
-                if (msg.data() == null && msg.dataBytes() != null) {
-                    try {
-                        msg.data(U.unmarshal(marsh, msg.dataBytes(), 
U.resolveClassLoader(ctx.config())));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to process message (ignoring): " 
+ msg, e);
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(U.unmarshal(marsh, msg.dataBytes(), 
U.resolveClassLoader(ctx.config())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message 
(ignoring): " + msg, e);
 
-                        return;
+                            return;
+                        }
                     }
-                }
 
-                switch (msg.type()) {
-                    case MSG_EVT_NOTIFICATION:
-                        processNotification(nodeId, msg);
+                    switch (msg.type()) {
+                        case MSG_EVT_NOTIFICATION:
+                            processNotification(nodeId, msg);
 
-                        break;
+                            break;
 
-                    case MSG_EVT_ACK:
-                        processMessageAck(msg);
+                        case MSG_EVT_ACK:
+                            processMessageAck(msg);
 
-                        break;
+                            break;
 
-                    default:
-                        assert false : "Unexpected message received: " + 
msg.type();
+                        default:
+                            assert false : "Unexpected message received: " + 
msg.type();
+                    }
                 }
             }
         });
@@ -433,6 +439,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             return data;
         }
+
         return null;
     }
 
@@ -483,7 +490,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 for (ContinuousRoutineInfo routineInfo : 
nodeData.startedRoutines) {
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    startRoutine(routineInfo);
+                    startDiscoveryDataRoutine(routineInfo);
                 }
             }
         }
@@ -506,7 +513,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
                     routinesInfo.addRoutineInfo(routineInfo);
 
-                    startRoutine(routineInfo);
+                    startDiscoveryDataRoutine(routineInfo);
                 }
             }
         }
@@ -523,7 +530,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * @param routineInfo Routine info.
      */
-    private void startRoutine(ContinuousRoutineInfo routineInfo) {
+    private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) {
         IgnitePredicate<ClusterNode> nodeFilter = null;
 
         try {
@@ -796,30 +803,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Whether local node is included in routine.
         boolean locIncluded = prjPred == null || 
prjPred.apply(ctx.discovery().localNode());
 
-        StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), 
bufSize, interval, autoUnsubscribe);
+        AbstractContinuousMessage msg;
 
         try {
-            if (ctx.config().isPeerClassLoadingEnabled()) {
-                // Handle peer deployment for projection predicate.
-                if (prjPred != null && !U.isGrid(prjPred.getClass())) {
-                    Class cls = U.detectClass(prjPred);
-
-                    String clsName = cls.getName();
-
-                    GridDeployment dep = ctx.deploy().deploy(cls, 
U.detectClassLoader(cls));
-
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException("Failed to 
deploy projection predicate: " + prjPred);
-
-                    reqData.className(clsName);
-                    reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
-
-                    reqData.p2pMarshal(marsh);
-                }
-
-                // Handle peer deployment for other handler-specific objects.
-                reqData.handler().p2pMarshal(ctx);
-            }
+            msg = createStartMessage(routineId, hnd, bufSize, interval, 
autoUnsubscribe, prjPred);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -828,16 +815,22 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Register per-routine notifications listener if ordered messaging is 
used.
         registerMessageListener(hnd);
 
-        StartFuture fut = new StartFuture(ctx, routineId);
+        StartFuture fut = new StartFuture(routineId);
 
         startFuts.put(routineId, fut);
 
         try {
-            if (locIncluded || hnd.isQuery())
-                registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, 
interval, autoUnsubscribe, true);
+            if (locIncluded || hnd.isQuery()) {
+                registerHandler(ctx.localNodeId(),
+                    routineId,
+                    hnd,
+                    bufSize,
+                    interval,
+                    autoUnsubscribe,
+                    true);
+            }
 
-            ctx.discovery().sendCustomEvent(new 
StartRoutineDiscoveryMessage(routineId, reqData,
-                reqData.handler().keepBinary()));
+            ctx.discovery().sendCustomEvent(msg);
         }
         catch (IgniteCheckedException e) {
             startFuts.remove(routineId);
@@ -857,6 +850,92 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param routineId Routine ID.
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param nodeFilter Node filter.
+     * @return Routine start message.
+     * @throws IgniteCheckedException If failed.
+     */
+    private AbstractContinuousMessage createStartMessage(UUID routineId,
+        GridContinuousHandler hnd,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgnitePredicate<ClusterNode> nodeFilter)
+        throws IgniteCheckedException
+    {
+        hnd = hnd.clone();
+
+        String clsName = null;
+        GridDeploymentInfoBean dep = null;
+
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            // Handle peer deployment for projection predicate.
+            if (nodeFilter != null && !U.isGrid(nodeFilter.getClass())) {
+                Class cls = U.detectClass(nodeFilter);
+
+                clsName = cls.getName();
+
+                GridDeployment dep0 = ctx.deploy().deploy(cls, 
U.detectClassLoader(cls));
+
+                if (dep0 == null)
+                    throw new IgniteDeploymentCheckedException("Failed to 
deploy projection predicate: " + nodeFilter);
+
+                dep = new GridDeploymentInfoBean(dep0);
+            }
+
+            // Handle peer deployment for other handler-specific objects.
+            hnd.p2pMarshal(ctx);
+        }
+
+        if (discoProtoVer == 1) {
+            StartRequestData reqData = new StartRequestData(
+                nodeFilter,
+                hnd,
+                bufSize,
+                interval,
+                autoUnsubscribe);
+
+            if (clsName != null) {
+                reqData.className(clsName);
+                reqData.deploymentInfo(dep);
+
+                reqData.p2pMarshal(marsh);
+            }
+
+            return new StartRoutineDiscoveryMessage(
+                routineId,
+                reqData,
+                reqData.handler().keepBinary());
+        }
+        else {
+            assert discoProtoVer == 2 : discoProtoVer;
+
+            byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, 
nodeFilter) : null;
+            byte[] hndBytes =  U.marshal(marsh, hnd);
+
+            StartRequestDataV2 reqData = new 
StartRequestDataV2(nodeFilterBytes,
+                hndBytes,
+                bufSize,
+                interval,
+                autoUnsubscribe);
+
+            if (clsName != null) {
+                reqData.className(clsName);
+                reqData.deploymentInfo(dep);
+            }
+
+            return new StartRoutineDiscoveryMessageV2(
+                routineId,
+                reqData,
+                hnd.keepBinary());
+        }
+    }
+
+    /**
      * @param hnd Handler.
      */
     private void registerMessageListener(GridContinuousHandler hnd) {
@@ -1138,35 +1217,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         StartFuture fut = startFuts.remove(msg.routineId());
 
         if (fut != null) {
-            if (msg.errs().isEmpty()) {
-                LocalRoutineInfo routine = locInfos.get(msg.routineId());
-
-                // Update partition counters.
-                if (routine != null && routine.handler().isQuery()) {
-                    Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = 
msg.updateCountersPerNode();
-                    Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
-
-                    GridCacheAdapter<Object, Object> interCache =
-                        
ctx.cache().internalCache(routine.handler().cacheName());
-
-                    GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
-
-                    if (cctx != null && cntrsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
-                        cntrsPerNode.put(ctx.localNodeId(),
-                            
toCountersMap(cctx.topology().localUpdateCounters(false)));
-
-                    routine.handler().updateCounters(topVer, cntrsPerNode, 
cntrs);
-                }
-
-                fut.onRemoteRegistered();
-            }
-            else {
-                IgniteCheckedException firstEx = F.first(msg.errs().values());
-
-                fut.onDone(firstEx);
-
-                stopRoutine(msg.routineId());
-            }
+            fut.onAllRemoteRegistered(
+                topVer,
+                msg.errs(),
+                msg.updateCountersPerNode(),
+                msg.updateCounters());
         }
     }
 
@@ -1280,96 +1335,201 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param sndId Sender node ID.
+     * @param msg Message.
+     */
+    private void processRoutineStartResultMessage(UUID sndId, 
ContinuousRoutineStartResultMessage msg) {
+        StartFuture fut = startFuts.get(msg.routineId());
+
+        if (fut != null)
+            fut.onResult(sndId, msg);
+    }
+
+    /**
+     * @param topVer Current topology version.
      * @param snd Sender.
      * @param msg Start request.
      */
-    private void processStartRequestV2(ClusterNode snd, 
StartRoutineDiscoveryMessageV2 msg) {
-        StartRequestDataV2 reqData = msg.startRequestData();
+    private void processStartRequestV2(final AffinityTopologyVersion topVer,
+        final ClusterNode snd,
+        final StartRoutineDiscoveryMessageV2 msg) {
+        try {
+            StartRequestDataV2 reqData = msg.startRequestData();
 
-        ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
-            msg.routineId(),
-            reqData.handlerBytes(),
-            reqData.nodeFilterBytes(),
-            reqData.bufferSize(),
-            reqData.interval(),
-            reqData.autoUnsubscribe());
+            ContinuousRoutineInfo routineInfo = new 
ContinuousRoutineInfo(snd.id(),
+                msg.routineId(),
+                reqData.handlerBytes(),
+                reqData.nodeFilterBytes(),
+                reqData.bufferSize(),
+                reqData.interval(),
+                reqData.autoUnsubscribe());
 
-        routinesInfo.addRoutineInfo(routineInfo);
+            routinesInfo.addRoutineInfo(routineInfo);
 
-        Exception err = null;
+            final Collection<ClusterNode> nodes = 
ctx.discovery().nodes(topVer);
 
-        IgnitePredicate<ClusterNode> nodeFilter = null;
+            // Should not use marshaller and send messages from discovery 
thread.
+            ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(new 
Runnable() {
+                @Override public void run() {
+                    if (snd.id().equals(ctx.localNodeId())) {
+                        StartFuture fut = startFuts.get(msg.routineId());
 
-        if (reqData.nodeFilterBytes() != null) {
-            try {
-                if (ctx.config().isPeerClassLoadingEnabled() && 
reqData.className() != null) {
-                    String clsName = reqData.className();
-                    GridDeploymentInfo depInfo = reqData.deploymentInfo();
-
-                    GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
-                        clsName,
-                        clsName,
-                        depInfo.userVersion(),
-                        snd.id(),
-                        depInfo.classLoaderId(),
-                        depInfo.participants(),
-                        null);
+                        if (fut != null)
+                            fut.initRemoteNodes(topVer, nodes);
 
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException("Failed to 
obtain deployment for class: " + clsName);
+                        return;
+                    }
 
-                    nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), 
U.resolveClassLoader(dep.classLoader(), ctx.config()));
-                }
-                else
-                    nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), 
U.resolveClassLoader(ctx.config()));
+                    StartRequestDataV2 reqData = msg.startRequestData();
 
-                if (nodeFilter != null)
-                    ctx.resource().injectGeneric(nodeFilter);
-            }
-            catch (Exception e) {
-                err = e;
+                    Exception err = null;
 
-                U.error(log, "Failed to unmarshal continuous routine filter [" 
+
-                    "routineId=" + routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
-            }
-        }
+                    IgnitePredicate<ClusterNode> nodeFilter = null;
 
-        boolean register = err == null && (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode()));
+                    byte[] cntrs = null;
 
-        if (register) {
-            try {
-                GridContinuousHandler hnd = U.unmarshal(marsh, 
reqData.handlerBytes(), U.resolveClassLoader(ctx.config()));
+                    if (reqData.nodeFilterBytes() != null) {
+                        try {
+                            if (ctx.config().isPeerClassLoadingEnabled() && 
reqData.className() != null) {
+                                String clsName = reqData.className();
+                                GridDeploymentInfo depInfo = 
reqData.deploymentInfo();
+
+                                GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
+                                    clsName,
+                                    clsName,
+                                    depInfo.userVersion(),
+                                    snd.id(),
+                                    depInfo.classLoaderId(),
+                                    depInfo.participants(),
+                                    null);
+
+                                if (dep == null) {
+                                    throw new 
IgniteDeploymentCheckedException("Failed to obtain deployment " +
+                                        "for class: " + clsName);
+                                }
+
+                                nodeFilter = U.unmarshal(marsh,
+                                    reqData.nodeFilterBytes(),
+                                    U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
+                            }
+                            else {
+                                nodeFilter = U.unmarshal(marsh,
+                                    reqData.nodeFilterBytes(),
+                                    U.resolveClassLoader(ctx.config()));
+                            }
+
+                            if (nodeFilter != null)
+                                ctx.resource().injectGeneric(nodeFilter);
+                        }
+                        catch (Exception e) {
+                            err = e;
 
-                if (ctx.config().isPeerClassLoadingEnabled())
-                    hnd.p2pUnmarshal(snd.id(), ctx);
+                            U.error(log, "Failed to unmarshal continuous 
routine filter [" +
+                                "routineId=" + msg.routineId +
+                                ", srcNodeId=" + snd.id() + ']', e);
+                        }
+                    }
 
-                if (msg.keepBinary()) {
-                    assert hnd instanceof CacheContinuousQueryHandler : hnd;
+                    boolean register = err == null &&
+                        (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode()));
+
+                    if (register) {
+                        try {
+                            GridContinuousHandler hnd = U.unmarshal(marsh,
+                                reqData.handlerBytes(),
+                                U.resolveClassLoader(ctx.config()));
 
-                    ((CacheContinuousQueryHandler)hnd).keepBinary(true);
+                            if (ctx.config().isPeerClassLoadingEnabled())
+                                hnd.p2pUnmarshal(snd.id(), ctx);
+
+                            if (msg.keepBinary()) {
+                                assert hnd instanceof 
CacheContinuousQueryHandler : hnd;
+
+                                
((CacheContinuousQueryHandler)hnd).keepBinary(true);
+                            }
+
+                            GridContinuousHandler hnd0 = hnd instanceof 
GridMessageListenHandler ?
+                                new 
GridMessageListenHandler((GridMessageListenHandler)hnd) :
+                                hnd;
+
+                            registerHandler(snd.id(),
+                                msg.routineId,
+                                hnd0,
+                                reqData.bufferSize(),
+                                reqData.interval(),
+                                reqData.autoUnsubscribe(),
+                                false);
+
+                            if (hnd0.isQuery()) {
+                                GridCacheProcessor proc = ctx.cache();
+
+                                if (proc != null) {
+                                    GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
+
+                                    if (cache != null && !cache.isLocal() && 
cache.context().userCache()) {
+                                        CachePartitionPartialCountersMap 
cntrsMap =
+                                            
cache.context().topology().localUpdateCounters(false);
+
+                                        cntrs = U.marshal(marsh, cntrsMap);
+                                    }
+                                }
+                            }
+                        }
+                        catch (Exception e) {
+                            err = e;
+
+                            U.error(log, "Failed to register continuous 
routine handler [" +
+                                "routineId=" + msg.routineId +
+                                ", srcNodeId=" + snd.id() + ']', e);
+                        }
+                    }
+
+                    sendMessageStartResult(snd, msg.routineId(), cntrs, err);
                 }
+            });
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to submit continuous routine started result 
closure: " + e, e);
+        }
+    }
 
-                GridContinuousHandler hnd0 = hnd instanceof 
GridMessageListenHandler ?
-                    new 
GridMessageListenHandler((GridMessageListenHandler)hnd) :
-                    hnd;
+    /**
+     * @param node Target node.
+     * @param routineId Routine ID.
+     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
+     * @param err Start error if any.
+     */
+    private void sendMessageStartResult(final ClusterNode node,
+        final UUID routineId,
+        byte[] cntrsMapBytes,
+        final @Nullable Exception err)
+    {
+        byte[] errBytes = null;
 
-                registerHandler(snd.id(),
-                    msg.routineId,
-                    hnd0,
-                    reqData.bufferSize(),
-                    reqData.interval(),
-                    reqData.autoUnsubscribe(),
-                    false);
+        if (err != null) {
+            try {
+                errBytes = U.marshal(marsh, err);
             }
             catch (Exception e) {
-                err = e;
-
-                U.error(log, "Failed to register continuous routine handler [" 
+
-                    "routineId=" + routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+                U.error(log, "Failed to marshal routine start error: " + e, e);
             }
         }
+
+        ContinuousRoutineStartResultMessage msg = new 
ContinuousRoutineStartResultMessage(routineId,
+            cntrsMapBytes,
+            errBytes,
+            err != null);
+
+        try {
+            ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, 
null);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send routine start result, node failed: " 
+ e);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send routine start result: " + e, e);
+        }
     }
 
     /**
@@ -1690,9 +1850,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-            if (discoProtoVer == 2)
+            if (discoProtoVer == 2) {
                 routinesInfo.onNodeFail(nodeId);
 
+                for (StartFuture fut : startFuts.values())
+                    fut.onNodeFail(nodeId);
+            }
+
             clientInfos.remove(nodeId);
 
             // Unregister handlers created by left node.
@@ -2132,10 +2296,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * Future for start routine.
      */
-    private static class StartFuture extends GridFutureAdapter<UUID> {
-        /** */
-        private GridKernalContext ctx;
-
+    private class StartFuture extends GridFutureAdapter<UUID> {
         /** Consume ID. */
         private UUID routineId;
 
@@ -2145,23 +2306,123 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         /** All remote listeners are registered. */
         private volatile boolean rmt;
 
-        /** Timeout object. */
-        private volatile GridTimeoutObject timeoutObj;
+        /** */
+        private AffinityTopologyVersion topVer;
+
+        /** */
+        private int expRes;
+
+        /** */
+        private final Map<UUID, ContinuousRoutineStartResultMessage> res = new 
HashMap<>();
 
         /**
-         * @param ctx Kernal context.
          * @param routineId Consume ID.
          */
-        StartFuture(GridKernalContext ctx, UUID routineId) {
-            this.ctx = ctx;
-
+        StartFuture(UUID routineId) {
             this.routineId = routineId;
         }
 
         /**
+         * @param topVer Topology version.
+         * @param errs Errors.
+         * @param cntrsPerNode Update counters.
+         * @param cntrs Update counters.
+         */
+        private void onAllRemoteRegistered(
+            AffinityTopologyVersion topVer,
+            @Nullable Map<UUID, ? extends Exception> errs,
+            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
+            Map<Integer, T2<Long, Long>> cntrs) {
+            try {
+                if (errs == null || errs.isEmpty()) {
+                    LocalRoutineInfo routine = locInfos.get(routineId);
+
+                    // Update partition counters.
+                    if (routine != null && routine.handler().isQuery()) {
+                        GridCacheAdapter<Object, Object> interCache =
+                            
ctx.cache().internalCache(routine.handler().cacheName());
+
+                        GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
+
+                        if (cctx != null && cntrsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
+                            cntrsPerNode.put(ctx.localNodeId(),
+                                
toCountersMap(cctx.topology().localUpdateCounters(false)));
+
+                        routine.handler().updateCounters(topVer, cntrsPerNode, 
cntrs);
+                    }
+
+                    onRemoteRegistered();
+                }
+                else {
+                    Exception firstEx = F.first(errs.values());
+
+                    onDone(firstEx);
+
+                    stopRoutine(routineId);
+                }
+            }
+            finally {
+                startFuts.remove(routineId, this);
+            }
+        }
+
+        /**
+         * @param topVer Topology version.
+         * @param nodes Nodes.
+         */
+        void initRemoteNodes(AffinityTopologyVersion topVer, 
Collection<ClusterNode> nodes) {
+            RoutineRegisterResults res0 = null;
+
+            synchronized (res) {
+                assert this.topVer == null && expRes == 0;
+
+                this.topVer = topVer;
+
+                for (ClusterNode node : nodes) {
+                    if (!ctx.localNodeId().equals(node.id()) && 
ctx.discovery().alive(node.id()))
+                        expRes++;
+                }
+
+                if (expRes == res.size())
+                    res0 = createRegisterResults();
+            }
+
+            if (res0 != null)
+                onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param msg Message.
+         */
+        void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) {
+            RoutineRegisterResults res0 = null;
+
+            synchronized (res) {
+                if (res.containsKey(nodeId) || (topVer != null && res.size() 
== expRes))
+                    return;
+
+                res.put(nodeId, msg);
+
+                if (topVer != null && expRes == res.size())
+                    res0 = createRegisterResults();
+            }
+
+            if (res0 != null)
+                onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeFail(UUID nodeId) {
+            onResult(nodeId, new 
ContinuousRoutineStartResultMessage(routineId, null, null, false));
+        }
+
+        /**
          * Called when local listener is registered.
          */
-        public void onLocalRegistered() {
+        void onLocalRegistered() {
             loc = true;
 
             if (rmt && !isDone())
@@ -2171,7 +2432,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         /**
          * Called when all remote listeners are registered.
          */
-        public void onRemoteRegistered() {
+        void onRemoteRegistered() {
             rmt = true;
 
             if (loc && !isDone())
@@ -2179,22 +2440,62 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         }
 
         /**
-         * @param timeoutObj Timeout object.
+         * @return Results.
          */
-        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
+        private RoutineRegisterResults createRegisterResults() {
+            Map<UUID, Exception> errs = null;
+            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null;
 
-            this.timeoutObj = timeoutObj;
+            for (Map.Entry<UUID, ContinuousRoutineStartResultMessage> entry : 
res.entrySet()) {
+                ContinuousRoutineStartResultMessage msg = entry.getValue();
 
-            ctx.timeout().addTimeoutObject(timeoutObj);
-        }
+                if (msg.error()) {
+                    byte[] errBytes = msg.errorBytes();
 
-        /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable UUID res, @Nullable 
Throwable err) {
-            if (timeoutObj != null)
-                ctx.timeout().removeTimeoutObject(timeoutObj);
+                    Exception err = null;
 
-            return super.onDone(res, err);
+                    if (errBytes != null) {
+                        try {
+                            err = U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
+                        }
+                        catch (Exception e) {
+                            U.warn(log, "Failed to unmarhal continuous routine 
start error: " + e);
+                        }
+                    }
+
+                    if (err == null) {
+                        err = new IgniteCheckedException("Failed to start 
continuous " +
+                            "routine on node: " + entry.getKey());
+                    }
+
+                    if (errs == null)
+                        errs = new HashMap<>();
+
+                    errs.put(entry.getKey(), err);
+                }
+                else {
+                    byte[] cntrsMapBytes = msg.countersMapBytes();
+
+                    if (cntrsMapBytes != null) {
+                        try {
+                            CachePartitionPartialCountersMap cntrsMap = 
U.unmarshal(
+                                marsh,
+                                cntrsMapBytes,
+                                U.resolveClassLoader(ctx.config()));
+
+                            if (cntrsPerNode == null)
+                                cntrsPerNode = new HashMap<>();
+
+                            cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+                        }
+                        catch (Exception e) {
+                            U.warn(log, "Failed to unmarhal continuous query 
update counters: " + e);
+                        }
+                    }
+                }
+            }
+
+            return new RoutineRegisterResults(topVer, errs, cntrsPerNode);
         }
 
         /** {@inheritDoc} */
@@ -2204,6 +2505,33 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     *
+     */
+    private static class RoutineRegisterResults {
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final Map<UUID, ? extends Exception> errs;
+
+        /** */
+        private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
+
+        /**
+         * @param topVer Topology version.
+         * @param errs Errors.
+         * @param cntrsPerNode Update counters.
+         */
+        RoutineRegisterResults(AffinityTopologyVersion topVer,
+            Map<UUID, ? extends Exception> errs,
+            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
+            this.topVer = topVer;
+            this.errs = errs;
+            this.cntrsPerNode = cntrsPerNode;
+        }
+    }
+
+    /**
      * Future for stop routine.
      */
     private static class StopFuture extends GridFutureAdapter<Object> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d8c52dc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
index d7d97a4..cddb446 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
@@ -30,7 +30,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 public class IgniteCacheEntryListenerAtomicTest extends 
IgniteCacheEntryListenerAbstractTest {
     /** {@inheritDoc} */
     @Override protected int gridCount() {
-        return 1;
+        return 3;
     }
 
     /** {@inheritDoc} */

Reply via email to