http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 9402a32..38450df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -155,22 +155,30 @@ final class BinaryMetadataTransport {
      * @param metadata Metadata proposed for update.
      * @return Future to wait for update result on.
      */
-    GridFutureAdapter<MetadataUpdateResult> 
requestMetadataUpdate(BinaryMetadata metadata) throws IgniteCheckedException {
+    GridFutureAdapter<MetadataUpdateResult> 
requestMetadataUpdate(BinaryMetadata metadata) {
         MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture();
 
         if (log.isDebugEnabled())
             log.debug("Requesting metadata update for " + metadata.typeId() + 
"; caller thread is blocked on future "
                 + resFut);
 
-        synchronized (this) {
-            unlabeledFutures.add(resFut);
+        try {
+            synchronized (this) {
+                unlabeledFutures.add(resFut);
 
-            if (!stopping)
-                discoMgr.sendCustomEvent(new 
MetadataUpdateProposedMessage(metadata, ctx.localNodeId()));
-            else
-                
resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+                if (!stopping)
+                    discoMgr.sendCustomEvent(new 
MetadataUpdateProposedMessage(metadata, ctx.localNodeId()));
+                else
+                    
resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+            }
+        }
+        catch (Exception e) {
+            resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult(), 
e);
         }
 
+        if (ctx.clientDisconnected())
+            onDisconnected();
+
         return resFut;
     }
 
@@ -237,6 +245,8 @@ final class BinaryMetadataTransport {
         for (MetadataUpdateResultFuture fut : unlabeledFutures)
             fut.onDone(res);
 
+        unlabeledFutures.clear();
+
         for (MetadataUpdateResultFuture fut : syncMap.values())
             fut.onDone(res);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
index 0416746..df64613 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -71,6 +71,11 @@ public class MetadataUpdateAcceptedMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index f9bd660..84e32e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -134,6 +134,11 @@ public final class MetadataUpdateProposedMessage 
implements DiscoveryCustomMessa
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 9b3c1ec..5bbbb31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -598,6 +598,45 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> allOwners() {
+        lock.readLock().lock();
+
+        try {
+            int parts = partitions();
+
+            List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+            for (int i = 0; i < parts; i++)
+                res.add(new ArrayList<>());
+
+            List<ClusterNode> allNodes = 
discoCache.cacheGroupAffinityNodes(grpId);
+
+            for (int i = 0; i < allNodes.size(); i++) {
+                ClusterNode node = allNodes.get(i);
+
+                GridDhtPartitionMap nodeParts = node2part.get(node.id());
+
+                if (nodeParts != null) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : 
nodeParts.map().entrySet()) {
+                        if (e.getValue() == OWNING) {
+                            int part = e.getKey();
+
+                            List<ClusterNode> owners = res.get(part);
+
+                            owners.add(node);
+                        }
+                    }
+                }
+            }
+
+            return res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
         return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index ba55543..ea99f5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -970,6 +970,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send get response to node, node 
failed: " + nodeId);
+                }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send get response to node (is node 
still alive?) [nodeId=" + nodeId +
                         ",req=" + req + ", res=" + res + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 13564c2..7f900cb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -237,6 +237,12 @@ public interface GridDhtPartitionTopology {
     public List<ClusterNode> owners(int p);
 
     /**
+     * @return List indexed by partition number, each list element is 
collection of all nodes who
+     *      owns corresponding partition.
+     */
+    public List<List<ClusterNode>> allOwners();
+
+    /**
      * @param p Partition ID.
      * @param topVer Topology version.
      * @return Collection of all nodes who {@code own} this partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 528f0a6..538c57e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1217,6 +1217,45 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> allOwners() {
+        lock.readLock().lock();
+
+        try {
+            int parts = partitions();
+
+            List<List<ClusterNode>> res = new ArrayList<>(parts);
+
+            for (int i = 0; i < parts; i++)
+                res.add(new ArrayList<>());
+
+            List<ClusterNode> allNodes = 
discoCache.cacheGroupAffinityNodes(grp.groupId());
+
+            for (int i = 0; i < allNodes.size(); i++) {
+                ClusterNode node = allNodes.get(i);
+
+                GridDhtPartitionMap nodeParts = node2part.get(node.id());
+
+                if (nodeParts != null) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : 
nodeParts.map().entrySet()) {
+                        if (e.getValue() == OWNING) {
+                            int part = e.getKey();
+
+                            List<ClusterNode> owners = res.get(part);
+
+                            owners.add(node);
+                        }
+                    }
+                }
+            }
+
+            return res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
         if (!grp.rebalanceEnabled())
             return ownersAndMoving(p, AffinityTopologyVersion.NONE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 8da91a8..cbb4985 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1506,12 +1506,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
         catch (ClusterTopologyCheckedException ignore) {
             if (log.isDebugEnabled())
-                log.debug("Oldest node left during partition exchange 
[nodeId=" + oldestNode.id() +
+                log.debug("Coordinator left during partition exchange 
[nodeId=" + oldestNode.id() +
                     ", exchId=" + exchId + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send local partitions to oldest node (will 
retry after timeout) [oldestNodeId=" +
-                oldestNode.id() + ", exchId=" + exchId + ']', e);
+            if (reconnectOnError(e))
+                onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+            else {
+                U.error(log, "Failed to send local partitions to coordinator 
[crd=" + oldestNode.id() +
+                    ", exchId=" + exchId + ']', e);
+            }
         }
     }
 
@@ -3369,9 +3373,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                             }
 
                             if (allReceived) {
-                                awaitSingleMapUpdates();
+                                
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
+                                    @Override public void run() {
+                                        awaitSingleMapUpdates();
 
-                                onAllReceived(null);
+                                        onAllReceived(null);
+                                    }
+                                });
                             }
                         }
                         else {
@@ -3399,7 +3407,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                                         ", newCrd=" + crd0.id() + ']');
                                 }
 
-                                sendPartitions(crd0);
+                                final ClusterNode newCrd = crd0;
+
+                                
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
+                                    @Override public void run() {
+                                        sendPartitions(newCrd);
+                                    }
+                                });
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
index d7dfa16..bbbd999 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java
@@ -94,6 +94,11 @@ public class ChangeGlobalStateFinishMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
index 50fc022..81855fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java
@@ -131,6 +131,11 @@ public class ChangeGlobalStateMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
new file mode 100644
index 0000000..5c3044b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ClusterMetricsUpdateMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private byte[] nodeMetrics;
+
+    /** */
+    @GridDirectMap(keyType = UUID.class, valueType = byte[].class)
+    private Map<UUID, byte[]> allNodesMetrics;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public ClusterMetricsUpdateMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeMetrics Node metrics.
+     */
+    ClusterMetricsUpdateMessage(byte[] nodeMetrics) {
+        this.nodeMetrics = nodeMetrics;
+    }
+
+    /**
+     * @param allNodesMetrics All nodes metrcis.
+     */
+    ClusterMetricsUpdateMessage(Map<UUID, byte[]> allNodesMetrics) {
+        this.allNodesMetrics = allNodesMetrics;
+    }
+
+    /**
+     * @return Node metrics.
+     */
+    @Nullable byte[] nodeMetrics() {
+        return nodeMetrics;
+    }
+
+    /**
+     * @return All nodes metrics.
+     */
+    @Nullable Map<UUID, byte[]> allNodesMetrics() {
+        return allNodesMetrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMap("allNodesMetrics", allNodesMetrics, 
MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("nodeMetrics", nodeMetrics))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                allNodesMetrics = reader.readMap("allNodesMetrics", 
MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                nodeMetrics = reader.readByteArray("nodeMetrics");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(ClusterMetricsUpdateMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 133;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClusterMetricsUpdateMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
new file mode 100644
index 0000000..22a385f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+
+/**
+ *
+ */
+class ClusterNodeMetrics implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final byte[] metrics;
+
+    /** */
+    private final Map<Integer, CacheMetrics> cacheMetrics;
+
+    /**
+     * @param metrics Metrics.
+     * @param cacheMetrics Cache metrics.
+     */
+    ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> 
cacheMetrics) {
+        this.metrics = ClusterMetricsSnapshot.serialize(metrics);
+        this.cacheMetrics = cacheMetrics;
+    }
+
+    /**
+     * @return Metrics.
+     */
+    byte[] metrics() {
+        return metrics;
+    }
+
+    /**
+     * @return Cache metrics.
+     */
+    Map<Integer, CacheMetrics> cacheMetrics() {
+        return cacheMetrics != null ? cacheMetrics : Collections.<Integer, 
CacheMetrics>emptyMap();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 5f2c66c..8796302 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -33,6 +33,8 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDiagnosticInfo;
 import org.apache.ignite.internal.IgniteDiagnosticMessage;
@@ -42,21 +44,29 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.IgniteClusterImpl;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridTimerTask;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED;
@@ -66,6 +76,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_METRICS;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -102,6 +113,18 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
     /** */
     private final AtomicLong diagFutId = new AtomicLong();
 
+    /** */
+    private final Map<UUID, byte[]> allNodesMetrics = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final JdkMarshaller marsh = new JdkMarshaller();
+
+    /** */
+    private DiscoveryMetricsProvider metricsProvider;
+
+    /** */
+    private boolean sndMetrics;
+
     /**
      * @param ctx Kernal context.
      */
@@ -111,6 +134,8 @@ public class ClusterProcessor extends GridProcessorAdapter {
         
notifyEnabled.set(IgniteSystemProperties.getBoolean(IGNITE_UPDATE_NOTIFIER, 
true));
 
         cluster = new IgniteClusterImpl(ctx);
+
+        sndMetrics = !(ctx.config().getDiscoverySpi() instanceof 
TcpDiscoverySpi);
     }
 
     /**
@@ -120,33 +145,31 @@ public class ClusterProcessor extends 
GridProcessorAdapter {
         return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
     }
 
-    /** */
-    private final JdkMarshaller marsh = new JdkMarshaller();
-
     /**
      * @throws IgniteCheckedException If failed.
      */
     public void initDiagnosticListeners() throws IgniteCheckedException {
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    assert evt instanceof DiscoveryEvent;
-                    assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
+            @Override public void onEvent(Event evt) {
+                assert evt instanceof DiscoveryEvent;
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
 
-                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
-                    UUID nodeId = discoEvt.eventNode().id();
+                UUID nodeId = discoEvt.eventNode().id();
 
-                    ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = 
diagnosticFutMap.get();
+                ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = 
diagnosticFutMap.get();
 
-                    if (futs != null) {
-                        for (InternalDiagnosticFuture fut : futs.values()) {
-                            if (fut.nodeId.equals(nodeId))
-                                fut.onDone(new IgniteDiagnosticInfo("Target 
node failed: " + nodeId));
-                        }
+                if (futs != null) {
+                    for (InternalDiagnosticFuture fut : futs.values()) {
+                        if (fut.nodeId.equals(nodeId))
+                            fut.onDone(new IgniteDiagnosticInfo("Target node 
failed: " + nodeId));
                     }
                 }
-            },
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+                allNodesMetrics.remove(nodeId);
+            }
+        }, EVT_NODE_FAILED, EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new 
GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
@@ -233,6 +256,17 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
                     U.warn(diagnosticLog, "Received unexpected message: " + 
msg);
             }
         });
+
+        if (sndMetrics) {
+            ctx.io().addMessageListener(TOPIC_METRICS, new 
GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg, byte 
plc) {
+                    if (msg instanceof ClusterMetricsUpdateMessage)
+                        processMetricsUpdateMessage(nodeId, 
(ClusterMetricsUpdateMessage)msg);
+                    else
+                        U.warn(log, "Received unexpected message for 
TOPIC_METRICS: " + msg);
+                }
+            });
+        }
     }
 
     /**
@@ -296,7 +330,6 @@ public class ClusterProcessor extends GridProcessorAdapter {
         }
     }
 
-
     /**
      * @param vals collection to seek through.
      */
@@ -334,6 +367,14 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
                     log.debug("Failed to create GridUpdateNotifier: " + e);
             }
         }
+
+        if (sndMetrics) {
+            metricsProvider = ctx.discovery().createMetricsProvider();
+
+            long updateFreq = ctx.config().getMetricsUpdateFrequency();
+
+            ctx.timeout().addTimeoutObject(new 
MetricsUpdateTimeoutObject(updateFreq));
+        }
     }
 
     /** {@inheritDoc} */
@@ -352,6 +393,133 @@ public class ClusterProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param sndNodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processMetricsUpdateMessage(UUID sndNodeId, 
ClusterMetricsUpdateMessage msg) {
+        byte[] nodeMetrics = msg.nodeMetrics();
+
+        if (nodeMetrics != null) {
+            assert msg.allNodesMetrics() == null;
+
+            allNodesMetrics.put(sndNodeId, nodeMetrics);
+
+            updateNodeMetrics(ctx.discovery().discoCache(), sndNodeId, 
nodeMetrics);
+        }
+        else {
+            Map<UUID, byte[]> allNodesMetrics = msg.allNodesMetrics();
+
+            assert allNodesMetrics != null;
+
+            DiscoCache discoCache = ctx.discovery().discoCache();
+
+            for (Map.Entry<UUID, byte[]> e : allNodesMetrics.entrySet()) {
+                if (!ctx.localNodeId().equals(e.getKey()))
+                    updateNodeMetrics(discoCache, e.getKey(), e.getValue());
+            }
+        }
+    }
+
+    /**
+     * @param discoCache Discovery data cache.
+     * @param nodeId Node ID.
+     * @param metricsBytes Marshalled metrics.
+     */
+    private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] 
metricsBytes) {
+        ClusterNode node = discoCache.node(nodeId);
+
+        if (node == null || !discoCache.alive(nodeId))
+            return;
+
+        try {
+            ClusterNodeMetrics metrics = 
U.unmarshalZip(ctx.config().getMarshaller(), metricsBytes, null);
+
+            assert node instanceof IgniteClusterNode : node;
+
+            IgniteClusterNode node0 = (IgniteClusterNode)node;
+
+            
node0.setMetrics(ClusterMetricsSnapshot.deserialize(metrics.metrics(), 0));
+            node0.setCacheMetrics(metrics.cacheMetrics());
+
+            ctx.discovery().metricsUpdateEvent(discoCache, node0);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to unmarshal node metrics: " + e);
+        }
+    }
+
+    /**
+     *
+     */
+    private void updateMetrics() {
+        if (ctx.isStopping() || ctx.clientDisconnected())
+            return;
+
+        ClusterNode oldest = 
ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
+
+        if (oldest == null)
+            return;
+
+        if (ctx.localNodeId().equals(oldest.id())) {
+            IgniteClusterNode locNode = 
(IgniteClusterNode)ctx.discovery().localNode();
+
+            locNode.setMetrics(metricsProvider.metrics());
+            locNode.setCacheMetrics(metricsProvider.cacheMetrics());
+
+            ClusterNodeMetrics metrics = new 
ClusterNodeMetrics(locNode.metrics(), locNode.cacheMetrics());
+
+            try {
+                byte[] metricsBytes = 
U.zip(U.marshal(ctx.config().getMarshaller(), metrics));
+
+                allNodesMetrics.put(ctx.localNodeId(), metricsBytes);
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to marshal local node metrics: " + e, e);
+            }
+
+            ctx.discovery().metricsUpdateEvent(ctx.discovery().discoCache(), 
locNode);
+
+            Collection<ClusterNode> allNodes = ctx.discovery().allNodes();
+
+            ClusterMetricsUpdateMessage msg = new 
ClusterMetricsUpdateMessage(new HashMap<>(allNodesMetrics));
+
+            for (ClusterNode node : allNodes) {
+                if (ctx.localNodeId().equals(node.id()) || 
!ctx.discovery().alive(node.id()))
+                    continue;
+
+                try {
+                    ctx.io().sendToGridTopic(node, TOPIC_METRICS, msg, 
GridIoPolicy.SYSTEM_POOL);
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send metrics update, node failed: 
" + e);
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send metrics update: " + e, e);
+                }
+            }
+        }
+        else {
+            ClusterNodeMetrics metrics = new 
ClusterNodeMetrics(metricsProvider.metrics(), metricsProvider.cacheMetrics());
+
+            try {
+                byte[] metricsBytes = 
U.zip(U.marshal(ctx.config().getMarshaller(), metrics));
+
+                ClusterMetricsUpdateMessage msg = new 
ClusterMetricsUpdateMessage(metricsBytes);
+
+                ctx.io().sendToGridTopic(oldest, TOPIC_METRICS, msg, 
GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send metrics update to oldest, node 
failed: " + e);
+            }
+            catch (IgniteCheckedException e) {
+                LT.warn(log, e, "Failed to send metrics update to oldest: " + 
e, false, false);
+            }
+        }
+    }
+
+    /**
      * Disables update notifier.
      */
     public void disableUpdateNotifier() {
@@ -571,4 +739,51 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
             return S.toString(InternalDiagnosticFuture.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class MetricsUpdateTimeoutObject implements GridTimeoutObject, 
Runnable {
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long timeout;
+
+        /**
+         * @param timeout Timeout.
+         */
+        MetricsUpdateTimeoutObject(long timeout) {
+            this.timeout = timeout;
+
+            endTime = U.currentTimeMillis() + timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            updateMetrics();
+
+            endTime = U.currentTimeMillis() + timeout;
+
+            ctx.timeout().addTimeoutObject(this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            ctx.getSystemExecutorService().execute(this);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index e9754d1..928c619 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -63,6 +63,11 @@ public abstract class AbstractContinuousMessage implements 
DiscoveryCustomMessag
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
new file mode 100644
index 0000000..fc0f181
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+class ContinuousRoutineInfo implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    UUID srcNodeId;
+
+    /** */
+    final UUID routineId;
+
+    /** */
+    final byte[] hnd;
+
+    /** */
+    final byte[] nodeFilter;
+
+    /** */
+    final int bufSize;
+
+    /** */
+    final long interval;
+
+    /** */
+    final boolean autoUnsubscribe;
+
+    /** */
+    transient boolean disconnected;
+
+    /**
+     * @param srcNodeId Source node ID.
+     * @param routineId Routine ID.
+     * @param hnd Marshalled handler.
+     * @param nodeFilter Marshalled node filter.
+     * @param bufSize Handler buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     */
+    ContinuousRoutineInfo(
+        UUID srcNodeId,
+        UUID routineId,
+        byte[] hnd,
+        byte[] nodeFilter,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe)
+    {
+        this.srcNodeId = srcNodeId;
+        this.routineId = routineId;
+        this.hnd = hnd;
+        this.nodeFilter = nodeFilter;
+        this.bufSize = bufSize;
+        this.interval = interval;
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /**
+     * @param srcNodeId Source node ID.
+     */
+    void sourceNodeId(UUID srcNodeId) {
+        this.srcNodeId = srcNodeId;
+    }
+
+    /**
+     *
+     */
+    void onDisconnected() {
+        disconnected = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ContinuousRoutineInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
new file mode 100644
index 0000000..581ac60
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ContinuousRoutineStartResultMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int ERROR_FLAG = 0x01;
+
+    /** */
+    private UUID routineId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private byte[] cntrsMapBytes;
+
+    /** */
+    private int flags;
+
+    /**
+     *
+     */
+    public ContinuousRoutineStartResultMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param routineId Routine ID.
+     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
+     * @param errBytes Error bytes.
+     * @param err {@code True} if failed to start routine.
+     */
+    ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, 
byte[] errBytes, boolean err) {
+        this.routineId = routineId;
+        this.cntrsMapBytes = cntrsMapBytes;
+        this.errBytes = errBytes;
+
+        if (err)
+            flags |= ERROR_FLAG;
+    }
+
+    /**
+     * @return Marshalled {@link CachePartitionPartialCountersMap}.
+     */
+    @Nullable byte[] countersMapBytes() {
+        return cntrsMapBytes;
+    }
+
+    /**
+     * @return {@code True} if failed to start routine.
+     */
+    boolean error() {
+        return (flags & ERROR_FLAG) != 0;
+    }
+
+    /**
+     * @return Routine ID.
+     */
+    UUID routineId() {
+        return routineId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    @Nullable byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("cntrsMapBytes", cntrsMapBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeUuid("routineId", routineId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntrsMapBytes = reader.readByteArray("cntrsMapBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                flags = reader.readInt("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                routineId = reader.readUuid("routineId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return 
reader.afterMessageRead(ContinuousRoutineStartResultMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 134;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ContinuousRoutineStartResultMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
new file mode 100644
index 0000000..d29de89
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ContinuousRoutinesCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final List<ContinuousRoutineInfo> startedRoutines;
+
+    /**
+     * @param startedRoutines Routines started in cluster.
+     */
+    ContinuousRoutinesCommonDiscoveryData(List<ContinuousRoutineInfo> 
startedRoutines) {
+        this.startedRoutines = startedRoutines;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ContinuousRoutinesCommonDiscoveryData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
new file mode 100644
index 0000000..ad24ff1
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
+
+/**
+ *
+ */
+class ContinuousRoutinesInfo {
+    /** */
+    private final Map<UUID, ContinuousRoutineInfo> startedRoutines = new 
HashMap<>();
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectGridNodeData(DiscoveryDataBag dataBag) {
+        synchronized (startedRoutines) {
+            if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
+                dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
+                    new ContinuousRoutinesCommonDiscoveryData(new 
ArrayList<>(startedRoutines.values())));
+        }
+    }
+
+    /**
+     * @param dataBag Discovery data bag.
+     */
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        synchronized (startedRoutines) {
+            for (ContinuousRoutineInfo info : startedRoutines.values()) {
+                if (info.disconnected)
+                    info.sourceNodeId(dataBag.joiningNodeId());
+            }
+
+            dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
+                new ContinuousRoutinesJoiningNodeDiscoveryData(new 
ArrayList<>(startedRoutines.values())));
+        }
+    }
+
+    /**
+     * @param info Routine info.
+     */
+    void addRoutineInfo(ContinuousRoutineInfo info) {
+        synchronized (startedRoutines) {
+            startedRoutines.put(info.routineId, info);
+        }
+    }
+
+    /**
+     * @param routineId Routine ID.
+     * @return {@code True} if routine exists.
+     */
+    boolean routineExists(UUID routineId) {
+        synchronized (startedRoutines) {
+            return startedRoutines.containsKey(routineId);
+        }
+    }
+
+    /**
+     * @param routineId Routine ID.
+     */
+    void removeRoutine(UUID routineId) {
+        synchronized (startedRoutines) {
+            startedRoutines.remove(routineId);
+        }
+    }
+
+    /**
+     * @param locRoutines Routines IDs which can survive reconnect.
+     */
+    void onClientDisconnected(Collection<UUID> locRoutines) {
+        synchronized (startedRoutines) {
+            for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = 
startedRoutines.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+
+                ContinuousRoutineInfo info = e.getValue();
+
+                if (!locRoutines.contains(info.routineId))
+                    it.remove();
+                else
+                    info.onDisconnected();
+            }
+        }
+    }
+
+    /**
+     * Removes all routines with autoUnsubscribe=false started by given node.
+     *
+     * @param nodeId Node ID.
+     */
+    void onNodeFail(UUID nodeId) {
+        synchronized (startedRoutines) {
+            for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = 
startedRoutines.entrySet().iterator(); it.hasNext();) {
+                Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+
+                ContinuousRoutineInfo info = e.getValue();
+
+                if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
+                    it.remove();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ContinuousRoutinesInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
new file mode 100644
index 0000000..9be6ef8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ContinuousRoutinesJoiningNodeDiscoveryData implements 
Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final List<ContinuousRoutineInfo> startedRoutines;
+
+    /**
+     * @param startedRoutines Routines registered on nodes, to be started in 
cluster.
+     */
+    ContinuousRoutinesJoiningNodeDiscoveryData(List<ContinuousRoutineInfo> 
startedRoutines) {
+        this.startedRoutines = startedRoutines;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ContinuousRoutinesJoiningNodeDiscoveryData.class, 
this);
+    }
+}

Reply via email to