Repository: ignite
Updated Branches:
  refs/heads/ignite-zk ade6986c3 -> e447de174


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e447de17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e447de17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e447de17

Branch: refs/heads/ignite-zk
Commit: e447de174f938626e4f2d5caf90e64b62f38349e
Parents: ade6986
Author: sboikov <[email protected]>
Authored: Fri Nov 24 15:38:47 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Nov 24 17:05:54 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |   6 +
 .../internal/managers/discovery/DiscoCache.java |   4 +
 .../discovery/GridDiscoveryManager.java         |  18 +-
 .../cluster/ClusterMetricsUpdateMessage.java    | 157 ++++++++++++
 .../processors/cluster/ClusterNodeMetrics.java  |  58 +++++
 .../processors/cluster/ClusterProcessor.java    | 236 +++++++++++++++++--
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   8 +-
 .../zk/internal/ZookeeperClusterNode.java       |  28 +--
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 121 +++++-----
 .../internal/ClusterNodeMetricsUpdateTest.java  | 100 ++++++++
 11 files changed, 635 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index abdbf95..e848c37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -115,7 +115,10 @@ public enum GridTopic {
     TOPIC_SCHEMA,
 
     /** */
-    TOPIC_INTERNAL_DIAGNOSTIC;
+    TOPIC_INTERNAL_DIAGNOSTIC,
+
+    /** */
+    TOPIC_METRICS;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 97e06bf..2f8ba6d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import 
org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -875,6 +876,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 129:
+                msg = new ClusterMetricsUpdateMessage();
+
+                break;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 9ed70aa..0e35c7b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -211,6 +211,10 @@ public class DiscoCache {
         return null;
     }
 
+    public boolean alive(UUID nodeId) {
+        return alives.contains(nodeId);
+    }
+
     /**
      * Gets all nodes that have cache with given name.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9396fe4..1a0712f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1035,7 +1035,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /**
      * @return Metrics provider.
      */
-    private DiscoveryMetricsProvider createMetricsProvider() {
+    public DiscoveryMetricsProvider createMetricsProvider() {
         return new DiscoveryMetricsProvider() {
             /** */
             private final long startTime = U.currentTimeMillis();
@@ -2123,6 +2123,19 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param discoCache
+     * @param node
+     */
+    public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) {
+        discoWrk.addEvent(EVT_NODE_METRICS_UPDATED,
+            discoCache.version(),
+            node,
+            discoCache,
+            discoCache.nodeMap.values(),
+            null);
+    }
+
+    /**
      * Gets first grid node start time, see {@link 
DiscoverySpi#getGridStartTime()}.
      *
      * @return Start time of the first grid node.
@@ -2512,6 +2525,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
             AffinityTopologyVersion topVer = evt.get2();
 
+            if (type == EVT_NODE_METRICS_UPDATED && 
topVer.compareTo(discoCache.version()) < 0)
+                return;
+
             ClusterNode node = evt.get3();
 
             boolean isDaemon = node.isDaemon();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
new file mode 100644
index 0000000..f6db706
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ClusterMetricsUpdateMessage implements Message {
+    /** */
+    private byte[] nodeMetrics;
+
+    /** */
+    @GridDirectMap(keyType = UUID.class, valueType = byte[].class)
+    private Map<UUID, byte[]> allNodesMetrics;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public ClusterMetricsUpdateMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param nodeMetrics Node metrics.
+     */
+    ClusterMetricsUpdateMessage(byte[] nodeMetrics) {
+        this.nodeMetrics = nodeMetrics;
+        this.allNodesMetrics = allNodesMetrics;
+    }
+
+    /**
+     * @param allNodesMetrics All nodes metrcis.
+     */
+    ClusterMetricsUpdateMessage(Map<UUID, byte[]> allNodesMetrics) {
+        this.nodeMetrics = nodeMetrics;
+        this.allNodesMetrics = allNodesMetrics;
+    }
+
+    /**
+     * @return Node metrics.
+     */
+    @Nullable byte[] nodeMetrics() {
+        return nodeMetrics;
+    }
+
+    /**
+     * @return All nodes metrics.
+     */
+    @Nullable Map<UUID, byte[]> allNodesMetrics() {
+        return allNodesMetrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMap("allNodesMetrics", allNodesMetrics, 
MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("nodeMetrics", nodeMetrics))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                allNodesMetrics = reader.readMap("allNodesMetrics", 
MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                nodeMetrics = reader.readByteArray("nodeMetrics");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(ClusterMetricsUpdateMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 129;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClusterMetricsUpdateMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
new file mode 100644
index 0000000..4a7dd77
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cluster;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
+
+/**
+ *
+ */
+class ClusterNodeMetrics implements Serializable {
+    /** */
+    private final ClusterMetrics metrics;
+
+    /** */
+    private final Map<Integer, CacheMetrics> cacheMetrics;
+
+    /**
+     * @param metrics Metrics.
+     * @param cacheMetrics Cache metrics.
+     */
+    ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> 
cacheMetrics) {
+        this.metrics = metrics;
+        this.cacheMetrics = cacheMetrics;
+    }
+
+    /**
+     * @return Metrics.
+     */
+    ClusterMetrics metrics() {
+        return metrics;
+    }
+
+    /**
+     * @return Cache metrics.
+     */
+    Map<Integer, CacheMetrics> cacheMetrics() {
+        return cacheMetrics != null ? cacheMetrics : Collections.<Integer, 
CacheMetrics>emptyMap();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 5f2c66c..8812161 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteDiagnosticInfo;
 import org.apache.ignite.internal.IgniteDiagnosticMessage;
@@ -42,8 +43,12 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.IgniteClusterImpl;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridTimerTask;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -54,9 +59,12 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED;
@@ -66,6 +74,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_METRICS;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -102,6 +111,10 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
     /** */
     private final AtomicLong diagFutId = new AtomicLong();
 
+    /** */
+    @GridDirectMap(keyType = UUID.class, valueType = byte[].class)
+    private final Map<UUID, byte[]> allNodesMetrics = new 
ConcurrentHashMap<>();
+
     /**
      * @param ctx Kernal context.
      */
@@ -123,30 +136,34 @@ public class ClusterProcessor extends 
GridProcessorAdapter {
     /** */
     private final JdkMarshaller marsh = new JdkMarshaller();
 
+    /** */
+    private DiscoveryMetricsProvider metricsProvider;
+
     /**
      * @throws IgniteCheckedException If failed.
      */
     public void initDiagnosticListeners() throws IgniteCheckedException {
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    assert evt instanceof DiscoveryEvent;
-                    assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
+            @Override public void onEvent(Event evt) {
+                assert evt instanceof DiscoveryEvent;
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
 
-                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
-                    UUID nodeId = discoEvt.eventNode().id();
+                UUID nodeId = discoEvt.eventNode().id();
 
-                    ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = 
diagnosticFutMap.get();
+                ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = 
diagnosticFutMap.get();
 
-                    if (futs != null) {
-                        for (InternalDiagnosticFuture fut : futs.values()) {
-                            if (fut.nodeId.equals(nodeId))
-                                fut.onDone(new IgniteDiagnosticInfo("Target 
node failed: " + nodeId));
-                        }
+                if (futs != null) {
+                    for (InternalDiagnosticFuture fut : futs.values()) {
+                        if (fut.nodeId.equals(nodeId))
+                            fut.onDone(new IgniteDiagnosticInfo("Target node 
failed: " + nodeId));
                     }
                 }
-            },
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+                allNodesMetrics.remove(nodeId);
+            }
+        }, EVT_NODE_FAILED, EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new 
GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
@@ -233,6 +250,17 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
                     U.warn(diagnosticLog, "Received unexpected message: " + 
msg);
             }
         });
+
+        if (!(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi)) {
+            ctx.io().addMessageListener(TOPIC_METRICS, new 
GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg, byte 
plc) {
+                    if (msg instanceof ClusterMetricsUpdateMessage)
+                        processMetricsUpdateMessage(nodeId, 
(ClusterMetricsUpdateMessage)msg);
+                    else
+                        U.warn(log, "Received unexpected message for 
TOPIC_METRICS: " + msg);
+                }
+            });
+        }
     }
 
     /**
@@ -296,7 +324,6 @@ public class ClusterProcessor extends GridProcessorAdapter {
         }
     }
 
-
     /**
      * @param vals collection to seek through.
      */
@@ -334,6 +361,14 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
                     log.debug("Failed to create GridUpdateNotifier: " + e);
             }
         }
+
+        if (!(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi)) {
+            metricsProvider = ctx.discovery().createMetricsProvider();
+
+            long updateFreq = ctx.config().getMetricsUpdateFrequency();
+
+            ctx.timeout().addTimeoutObject(new 
MetricsUpdateTimeoutObject(updateFreq));
+        }
     }
 
     /** {@inheritDoc} */
@@ -352,6 +387,127 @@ public class ClusterProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param msg Message.
+     */
+    private void processMetricsUpdateMessage(UUID sndNodeId, 
ClusterMetricsUpdateMessage msg) {
+        byte[] nodeMetrics = msg.nodeMetrics();
+
+        if (nodeMetrics != null) {
+            assert msg.allNodesMetrics() == null;
+
+            allNodesMetrics.put(sndNodeId, nodeMetrics);
+
+            updateNodeMetrics(ctx.discovery().discoCache(), sndNodeId, 
nodeMetrics);
+        }
+        else {
+            Map<UUID, byte[]> allNodesMetrics = msg.allNodesMetrics();
+
+            assert allNodesMetrics != null;
+
+            DiscoCache discoCache = ctx.discovery().discoCache();
+
+            for (Map.Entry<UUID, byte[]> e : allNodesMetrics.entrySet()) {
+                if (!ctx.localNodeId().equals(e.getKey()))
+                    updateNodeMetrics(discoCache, e.getKey(), e.getValue());
+            }
+        }
+    }
+
+    private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] 
metricsBytes) {
+        ClusterNode node = discoCache.node(nodeId);
+
+        if (node == null || !discoCache.alive(nodeId))
+            return;
+
+        try {
+            ClusterNodeMetrics metrics = 
U.unmarshal(ctx.config().getMarshaller(), metricsBytes, null);
+
+            assert node instanceof IgniteClusterNode : node;
+
+            IgniteClusterNode node0 = (IgniteClusterNode)node;
+
+            node0.setMetrics(metrics.metrics());
+            node0.setCacheMetrics(metrics.cacheMetrics());
+
+            ctx.discovery().metricsUpdateEvent(discoCache, node0);
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to unmarshal node metrics: ");
+        }
+    }
+
+    /**
+     *
+     */
+    private void updateMetrics() {
+        if (ctx.isStopping() || ctx.clientDisconnected())
+            return;
+
+        ClusterNode oldest = 
ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
+
+        if (oldest == null)
+            return;
+
+        if (ctx.localNodeId().equals(oldest.id())) {
+            IgniteClusterNode locNode = 
(IgniteClusterNode)ctx.discovery().localNode();
+
+            locNode.setMetrics(metricsProvider.metrics());
+            locNode.setCacheMetrics(metricsProvider.cacheMetrics());
+
+            ClusterNodeMetrics metrics = new 
ClusterNodeMetrics(locNode.metrics(), locNode.cacheMetrics());
+
+            try {
+                byte[] metricsBytes = U.marshal(ctx.config().getMarshaller(), 
metrics);
+
+                allNodesMetrics.put(ctx.localNodeId(), metricsBytes);
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to marshal local node metrics: " + e, e);
+            }
+
+            ctx.discovery().metricsUpdateEvent(ctx.discovery().discoCache(), 
locNode);
+
+            Collection<ClusterNode> allNodes = ctx.discovery().allNodes();
+
+            ClusterMetricsUpdateMessage msg = new 
ClusterMetricsUpdateMessage(new HashMap<>(allNodesMetrics));
+
+            for (ClusterNode node : allNodes) {
+                if (ctx.localNodeId().equals(node.id()) || 
!ctx.discovery().alive(node.id()))
+                    continue;
+
+                try {
+                    ctx.io().sendToGridTopic(node, TOPIC_METRICS, msg, 
GridIoPolicy.SYSTEM_POOL);
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send metrics update, node failed: 
" + e);
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send metrics update: " + e, e);
+                }
+            }
+        }
+        else {
+            ClusterNodeMetrics metrics = new 
ClusterNodeMetrics(metricsProvider.metrics(), metricsProvider.cacheMetrics());
+
+            try {
+                byte[] metricsBytes = U.marshal(ctx.config().getMarshaller(), 
metrics);
+
+                ClusterMetricsUpdateMessage msg = new 
ClusterMetricsUpdateMessage(metricsBytes);
+
+                ctx.io().sendToGridTopic(oldest, TOPIC_METRICS, msg, 
GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send metrics update to oldest, node 
failed: " + e);
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to send metrics update to oldest: " + e, 
e);
+            }
+        }
+    }
+
+    /**
      * Disables update notifier.
      */
     public void disableUpdateNotifier() {
@@ -571,4 +727,56 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
             return S.toString(InternalDiagnosticFuture.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class MetricsUpdateTimeoutObject implements GridTimeoutObject, 
Runnable {
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long timeout;
+
+        /**
+         * @param timeout Timeout.
+         */
+        MetricsUpdateTimeoutObject(long timeout) {
+            this.timeout = timeout;
+
+            endTime = U.currentTimeMillis() + timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            updateMetrics();
+
+            endTime = U.currentTimeMillis() + timeout;
+
+            ctx.timeout().addTimeoutObject(this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            try {
+                
ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(this);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to submit metrics update task: " + e, e);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index 45c7953..8f365c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -324,8 +324,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             locNodeVer,
             locNodeAttrs,
             consistentId,
-            ignite.configuration().isClientMode(),
-            metricsProvider);
+            ignite.configuration().isClientMode());
 
         locNode.local(true);
 
@@ -337,6 +336,11 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
         if (log.isDebugEnabled())
             log.debug("Local node initialized: " + locNode);
 
+        if (metricsProvider != null) {
+            locNode.setMetrics(metricsProvider.metrics());
+            locNode.setCacheMetrics(metricsProvider.cacheMetrics());
+        }
+
         return locNode;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
index f2f0362..b51a556 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -68,10 +68,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable {
     @GridToStringExclude
     private Map<String, Object> attrs;
 
-    /** Metrics provider (transient). */
-    @GridToStringExclude
-    private transient DiscoveryMetricsProvider metricsProvider;
-
     /** */
     private transient boolean loc;
 
@@ -105,8 +101,7 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable {
         IgniteProductVersion ver,
         Map<String, Object> attrs,
         Serializable consistentId,
-        boolean client,
-        DiscoveryMetricsProvider metricsProvider
+        boolean client
     ) {
         assert id != null;
         assert consistentId != null;
@@ -115,7 +110,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable {
         this.ver = ver;
         this.attrs = U.sealMap(attrs);
         this.consistentId = consistentId;
-        this.metricsProvider = metricsProvider;
 
         if (client)
             flags |= CLIENT_NODE_MASK;
@@ -158,18 +152,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable {
 
     /** {@inheritDoc} */
     @Override public ClusterMetrics metrics() {
-        if (metricsProvider != null) {
-            ClusterMetrics metrics0 = metricsProvider.metrics();
-
-            metrics = metrics0;
-
-            return metrics0;
-        }
-
-        // TODO: ZK
-        if (metrics == null)
-            return new ClusterMetricsSnapshot();
-
         return metrics;
     }
 
@@ -182,14 +164,6 @@ public class ZookeeperClusterNode implements 
IgniteClusterNode, Serializable {
 
     /** {@inheritDoc} */
     @Override public Map<Integer, CacheMetrics> cacheMetrics() {
-        if (metricsProvider != null) {
-            Map<Integer, CacheMetrics> cacheMetrics0 = 
metricsProvider.cacheMetrics();
-
-            cacheMetrics = cacheMetrics0;
-
-            return cacheMetrics0;
-        }
-
         return cacheMetrics;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index e7e2846..74b8a5c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
@@ -91,6 +92,9 @@ public class ZookeeperDiscoveryImpl {
     private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
 
     /** */
+    private final AliveNodeDataWatcher aliveNodeDataWatcher = new 
AliveNodeDataWatcher();
+
+    /** */
     private final ZkWatcher watcher;
 
     /** */
@@ -585,66 +589,7 @@ public class ZookeeperDiscoveryImpl {
         String path = zkPaths.aliveNodesDir + "/" + alivePath;
 
         if (!path.equals(locNodeZkPath))
-            zkClient.getDataAsync(path, aliveNodeDataWatcher, 
aliveNodeDataUpdateCallback);
-    }
-
-    /** */
-    private final AliveNodeDataWatcher aliveNodeDataWatcher = new 
AliveNodeDataWatcher();
-
-    /** */
-    private AliveNodeDataUpdateCallback aliveNodeDataUpdateCallback = new 
AliveNodeDataUpdateCallback();
-
-    /**
-     *
-     */
-    private class AliveNodeDataWatcher implements Watcher {
-        @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDataChanged)
-                zkClient.getDataAsync(evt.getPath(), this, 
aliveNodeDataUpdateCallback);
-        }
-    }
-
-    /**
-     *
-     */
-    private class AliveNodeDataUpdateCallback implements 
AsyncCallback.DataCallback {
-        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
-            assert crd;
-
-            if (rc == KeeperException.Code.NONODE.intValue()) {
-                if (log.isDebugEnabled())
-                    log.debug("Alive node callaback, no node: " + path);
-
-                return;
-            }
-
-            assert rc == 0 : KeeperException.Code.get(rc);
-
-            try {
-                if (data.length > 0) {
-                    ZkAliveNodeData nodeData = unmarshal(data);
-
-                    Integer nodeInternalId = 
ZkIgnitePaths.aliveInternalId(path);
-
-                    Iterator<ZkDiscoveryEventData> it = 
evtsData.evts.values().iterator();
-
-                    boolean processed = false;
-
-                    while (it.hasNext()) {
-                        ZkDiscoveryEventData evtData = it.next();
-
-                        if (evtData.onAckReceived(nodeInternalId, 
nodeData.lastProcEvt))
-                            processed = true;
-                    }
-
-                    if (processed)
-                        handleProcessedEvents();
-                }
-            }
-            catch (Throwable e) {
-                onFatalError(e);
-            }
-        }
+            zkClient.getDataAsync(path, aliveNodeDataWatcher, 
aliveNodeDataWatcher);
     }
 
     /**
@@ -1133,8 +1078,14 @@ public class ZookeeperDiscoveryImpl {
 
         List<ZookeeperClusterNode> allNodes = dataForJoined.topology();
 
-        for (ZookeeperClusterNode node : allNodes)
+        // TODO ZK
+        for (int i = 0; i < allNodes.size(); i++) {
+            ZookeeperClusterNode node = allNodes.get(i);
+
+            node.setMetrics(new ClusterMetricsSnapshot());
+
             top.addNode(node);
+        }
 
         top.addNode(locNode);
 
@@ -1492,4 +1443,52 @@ public class ZookeeperDiscoveryImpl {
             }
         }
     }
+
+    /**
+     *
+     */
+    private class AliveNodeDataWatcher implements Watcher, 
AsyncCallback.DataCallback {
+        @Override public void process(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDataChanged)
+                zkClient.getDataAsync(evt.getPath(), this, this);
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            assert crd;
+
+            if (rc == KeeperException.Code.NONODE.intValue()) {
+                if (log.isDebugEnabled())
+                    log.debug("Alive node callaback, no node: " + path);
+
+                return;
+            }
+
+            assert rc == 0 : KeeperException.Code.get(rc);
+
+            try {
+                if (data.length > 0) {
+                    ZkAliveNodeData nodeData = unmarshal(data);
+
+                    Integer nodeInternalId = 
ZkIgnitePaths.aliveInternalId(path);
+
+                    Iterator<ZkDiscoveryEventData> it = 
evtsData.evts.values().iterator();
+
+                    boolean processed = false;
+
+                    while (it.hasNext()) {
+                        ZkDiscoveryEventData evtData = it.next();
+
+                        if (evtData.onAckReceived(nodeInternalId, 
nodeData.lastProcEvt))
+                            processed = true;
+                    }
+
+                    if (processed)
+                        handleProcessedEvents();
+                }
+            }
+            catch (Throwable e) {
+                onFatalError(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
new file mode 100644
index 0000000..58b2102
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setMetricsUpdateFrequency(500);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMetrics() throws Exception {
+        //IgnitionEx.TEST_ZK = false;
+
+        Ignite srv0 = startGrids(3);
+
+        IgniteCompute c1 = srv0.compute(srv0.cluster().forNodeId(nodeId(1)));
+        IgniteCompute c2 = srv0.compute(srv0.cluster().forNodeId(nodeId(2)));
+
+        c1.call(new DummyCallable(null));
+
+        Thread.sleep(3000);
+
+        Ignite srv1 = ignite(0);
+
+        
System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getAverageCpuLoad());
+        
System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getAverageCpuLoad());
+        
System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getAverageCpuLoad());
+
+        Thread.sleep(3000);
+
+        
System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getTotalExecutedJobs());
+        
System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getTotalExecutedJobs());
+        
System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getTotalExecutedJobs());
+    }
+
+    private UUID nodeId(int nodeIdx) {
+        return ignite(nodeIdx).cluster().localNode().id();
+    }
+
+    /**
+     *
+     */
+    private static class DummyCallable implements IgniteCallable<Object> {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        DummyCallable(byte[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return data;
+        }
+    }
+}

Reply via email to