Repository: ignite Updated Branches: refs/heads/ignite-zk ade6986c3 -> e447de174
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e447de17 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e447de17 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e447de17 Branch: refs/heads/ignite-zk Commit: e447de174f938626e4f2d5caf90e64b62f38349e Parents: ade6986 Author: sboikov <[email protected]> Authored: Fri Nov 24 15:38:47 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 24 17:05:54 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../communication/GridIoMessageFactory.java | 6 + .../internal/managers/discovery/DiscoCache.java | 4 + .../discovery/GridDiscoveryManager.java | 18 +- .../cluster/ClusterMetricsUpdateMessage.java | 157 ++++++++++++ .../processors/cluster/ClusterNodeMetrics.java | 58 +++++ .../processors/cluster/ClusterProcessor.java | 236 +++++++++++++++++-- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 8 +- .../zk/internal/ZookeeperClusterNode.java | 28 +-- .../zk/internal/ZookeeperDiscoveryImpl.java | 121 +++++----- .../internal/ClusterNodeMetricsUpdateTest.java | 100 ++++++++ 11 files changed, 635 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index abdbf95..e848c37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -115,7 +115,10 @@ public enum GridTopic { TOPIC_SCHEMA, /** */ - TOPIC_INTERNAL_DIAGNOSTIC; + TOPIC_INTERNAL_DIAGNOSTIC, + + /** */ + TOPIC_METRICS; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 97e06bf..2f8ba6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -117,6 +117,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; +import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -875,6 +876,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 129: + msg = new ClusterMetricsUpdateMessage(); + + break; + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 9ed70aa..0e35c7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -211,6 +211,10 @@ public class DiscoCache { return null; } + public boolean alive(UUID nodeId) { + return alives.contains(nodeId); + } + /** * Gets all nodes that have cache with given name. * http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 9396fe4..1a0712f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1035,7 +1035,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @return Metrics provider. */ - private DiscoveryMetricsProvider createMetricsProvider() { + public DiscoveryMetricsProvider createMetricsProvider() { return new DiscoveryMetricsProvider() { /** */ private final long startTime = U.currentTimeMillis(); @@ -2123,6 +2123,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param discoCache + * @param node + */ + public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) { + discoWrk.addEvent(EVT_NODE_METRICS_UPDATED, + discoCache.version(), + node, + discoCache, + discoCache.nodeMap.values(), + null); + } + + /** * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}. * * @return Start time of the first grid node. @@ -2512,6 +2525,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer = evt.get2(); + if (type == EVT_NODE_METRICS_UPDATED && topVer.compareTo(discoCache.version()) < 0) + return; + ClusterNode node = evt.get3(); boolean isDaemon = node.isDaemon(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java new file mode 100644 index 0000000..f6db706 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ClusterMetricsUpdateMessage implements Message { + /** */ + private byte[] nodeMetrics; + + /** */ + @GridDirectMap(keyType = UUID.class, valueType = byte[].class) + private Map<UUID, byte[]> allNodesMetrics; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public ClusterMetricsUpdateMessage() { + // No-op. + } + + /** + * @param nodeMetrics Node metrics. + */ + ClusterMetricsUpdateMessage(byte[] nodeMetrics) { + this.nodeMetrics = nodeMetrics; + this.allNodesMetrics = allNodesMetrics; + } + + /** + * @param allNodesMetrics All nodes metrcis. + */ + ClusterMetricsUpdateMessage(Map<UUID, byte[]> allNodesMetrics) { + this.nodeMetrics = nodeMetrics; + this.allNodesMetrics = allNodesMetrics; + } + + /** + * @return Node metrics. + */ + @Nullable byte[] nodeMetrics() { + return nodeMetrics; + } + + /** + * @return All nodes metrics. + */ + @Nullable Map<UUID, byte[]> allNodesMetrics() { + return allNodesMetrics; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMap("allNodesMetrics", allNodesMetrics, MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("nodeMetrics", nodeMetrics)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + allNodesMetrics = reader.readMap("allNodesMetrics", MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + nodeMetrics = reader.readByteArray("nodeMetrics"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(ClusterMetricsUpdateMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 129; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterMetricsUpdateMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java new file mode 100644 index 0000000..4a7dd77 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; + +/** + * + */ +class ClusterNodeMetrics implements Serializable { + /** */ + private final ClusterMetrics metrics; + + /** */ + private final Map<Integer, CacheMetrics> cacheMetrics; + + /** + * @param metrics Metrics. + * @param cacheMetrics Cache metrics. + */ + ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> cacheMetrics) { + this.metrics = metrics; + this.cacheMetrics = cacheMetrics; + } + + /** + * @return Metrics. + */ + ClusterMetrics metrics() { + return metrics; + } + + /** + * @return Cache metrics. + */ + Map<Integer, CacheMetrics> cacheMetrics() { + return cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 5f2c66c..8812161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDiagnosticInfo; import org.apache.ignite.internal.IgniteDiagnosticMessage; @@ -42,8 +43,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.IgniteClusterImpl; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridTimerTask; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -54,9 +59,12 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; +import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED; @@ -66,6 +74,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC; import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC; +import static org.apache.ignite.internal.GridTopic.TOPIC_METRICS; import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR; /** @@ -102,6 +111,10 @@ public class ClusterProcessor extends GridProcessorAdapter { /** */ private final AtomicLong diagFutId = new AtomicLong(); + /** */ + @GridDirectMap(keyType = UUID.class, valueType = byte[].class) + private final Map<UUID, byte[]> allNodesMetrics = new ConcurrentHashMap<>(); + /** * @param ctx Kernal context. */ @@ -123,30 +136,34 @@ public class ClusterProcessor extends GridProcessorAdapter { /** */ private final JdkMarshaller marsh = new JdkMarshaller(); + /** */ + private DiscoveryMetricsProvider metricsProvider; + /** * @throws IgniteCheckedException If failed. */ public void initDiagnosticListeners() throws IgniteCheckedException { ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - UUID nodeId = discoEvt.eventNode().id(); + UUID nodeId = discoEvt.eventNode().id(); - ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); + ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); - if (futs != null) { - for (InternalDiagnosticFuture fut : futs.values()) { - if (fut.nodeId.equals(nodeId)) - fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId)); - } + if (futs != null) { + for (InternalDiagnosticFuture fut : futs.values()) { + if (fut.nodeId.equals(nodeId)) + fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId)); } } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT); + + allNodesMetrics.remove(nodeId); + } + }, EVT_NODE_FAILED, EVT_NODE_LEFT); ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object msg, byte plc) { @@ -233,6 +250,17 @@ public class ClusterProcessor extends GridProcessorAdapter { U.warn(diagnosticLog, "Received unexpected message: " + msg); } }); + + if (!(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi)) { + ctx.io().addMessageListener(TOPIC_METRICS, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof ClusterMetricsUpdateMessage) + processMetricsUpdateMessage(nodeId, (ClusterMetricsUpdateMessage)msg); + else + U.warn(log, "Received unexpected message for TOPIC_METRICS: " + msg); + } + }); + } } /** @@ -296,7 +324,6 @@ public class ClusterProcessor extends GridProcessorAdapter { } } - /** * @param vals collection to seek through. */ @@ -334,6 +361,14 @@ public class ClusterProcessor extends GridProcessorAdapter { log.debug("Failed to create GridUpdateNotifier: " + e); } } + + if (!(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi)) { + metricsProvider = ctx.discovery().createMetricsProvider(); + + long updateFreq = ctx.config().getMetricsUpdateFrequency(); + + ctx.timeout().addTimeoutObject(new MetricsUpdateTimeoutObject(updateFreq)); + } } /** {@inheritDoc} */ @@ -352,6 +387,127 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * @param msg Message. + */ + private void processMetricsUpdateMessage(UUID sndNodeId, ClusterMetricsUpdateMessage msg) { + byte[] nodeMetrics = msg.nodeMetrics(); + + if (nodeMetrics != null) { + assert msg.allNodesMetrics() == null; + + allNodesMetrics.put(sndNodeId, nodeMetrics); + + updateNodeMetrics(ctx.discovery().discoCache(), sndNodeId, nodeMetrics); + } + else { + Map<UUID, byte[]> allNodesMetrics = msg.allNodesMetrics(); + + assert allNodesMetrics != null; + + DiscoCache discoCache = ctx.discovery().discoCache(); + + for (Map.Entry<UUID, byte[]> e : allNodesMetrics.entrySet()) { + if (!ctx.localNodeId().equals(e.getKey())) + updateNodeMetrics(discoCache, e.getKey(), e.getValue()); + } + } + } + + private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] metricsBytes) { + ClusterNode node = discoCache.node(nodeId); + + if (node == null || !discoCache.alive(nodeId)) + return; + + try { + ClusterNodeMetrics metrics = U.unmarshal(ctx.config().getMarshaller(), metricsBytes, null); + + assert node instanceof IgniteClusterNode : node; + + IgniteClusterNode node0 = (IgniteClusterNode)node; + + node0.setMetrics(metrics.metrics()); + node0.setCacheMetrics(metrics.cacheMetrics()); + + ctx.discovery().metricsUpdateEvent(discoCache, node0); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to unmarshal node metrics: "); + } + } + + /** + * + */ + private void updateMetrics() { + if (ctx.isStopping() || ctx.clientDisconnected()) + return; + + ClusterNode oldest = ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); + + if (oldest == null) + return; + + if (ctx.localNodeId().equals(oldest.id())) { + IgniteClusterNode locNode = (IgniteClusterNode)ctx.discovery().localNode(); + + locNode.setMetrics(metricsProvider.metrics()); + locNode.setCacheMetrics(metricsProvider.cacheMetrics()); + + ClusterNodeMetrics metrics = new ClusterNodeMetrics(locNode.metrics(), locNode.cacheMetrics()); + + try { + byte[] metricsBytes = U.marshal(ctx.config().getMarshaller(), metrics); + + allNodesMetrics.put(ctx.localNodeId(), metricsBytes); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to marshal local node metrics: " + e, e); + } + + ctx.discovery().metricsUpdateEvent(ctx.discovery().discoCache(), locNode); + + Collection<ClusterNode> allNodes = ctx.discovery().allNodes(); + + ClusterMetricsUpdateMessage msg = new ClusterMetricsUpdateMessage(new HashMap<>(allNodesMetrics)); + + for (ClusterNode node : allNodes) { + if (ctx.localNodeId().equals(node.id()) || !ctx.discovery().alive(node.id())) + continue; + + try { + ctx.io().sendToGridTopic(node, TOPIC_METRICS, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send metrics update, node failed: " + e); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send metrics update: " + e, e); + } + } + } + else { + ClusterNodeMetrics metrics = new ClusterNodeMetrics(metricsProvider.metrics(), metricsProvider.cacheMetrics()); + + try { + byte[] metricsBytes = U.marshal(ctx.config().getMarshaller(), metrics); + + ClusterMetricsUpdateMessage msg = new ClusterMetricsUpdateMessage(metricsBytes); + + ctx.io().sendToGridTopic(oldest, TOPIC_METRICS, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send metrics update to oldest, node failed: " + e); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send metrics update to oldest: " + e, e); + } + } + } + + /** * Disables update notifier. */ public void disableUpdateNotifier() { @@ -571,4 +727,56 @@ public class ClusterProcessor extends GridProcessorAdapter { return S.toString(InternalDiagnosticFuture.class, this); } } + + /** + * + */ + private class MetricsUpdateTimeoutObject implements GridTimeoutObject, Runnable { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private long endTime; + + /** */ + private final long timeout; + + /** + * @param timeout Timeout. + */ + MetricsUpdateTimeoutObject(long timeout) { + this.timeout = timeout; + + endTime = U.currentTimeMillis() + timeout; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void run() { + updateMetrics(); + + endTime = U.currentTimeMillis() + timeout; + + ctx.timeout().addTimeoutObject(this); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + try { + ctx.pools().poolForPolicy(GridIoPolicy.SYSTEM_POOL).execute(this); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to submit metrics update task: " + e, e); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 45c7953..8f365c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -324,8 +324,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery locNodeVer, locNodeAttrs, consistentId, - ignite.configuration().isClientMode(), - metricsProvider); + ignite.configuration().isClientMode()); locNode.local(true); @@ -337,6 +336,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (log.isDebugEnabled()) log.debug("Local node initialized: " + locNode); + if (metricsProvider != null) { + locNode.setMetrics(metricsProvider.metrics()); + locNode.setCacheMetrics(metricsProvider.cacheMetrics()); + } + return locNode; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index f2f0362..b51a556 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -68,10 +68,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { @GridToStringExclude private Map<String, Object> attrs; - /** Metrics provider (transient). */ - @GridToStringExclude - private transient DiscoveryMetricsProvider metricsProvider; - /** */ private transient boolean loc; @@ -105,8 +101,7 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { IgniteProductVersion ver, Map<String, Object> attrs, Serializable consistentId, - boolean client, - DiscoveryMetricsProvider metricsProvider + boolean client ) { assert id != null; assert consistentId != null; @@ -115,7 +110,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { this.ver = ver; this.attrs = U.sealMap(attrs); this.consistentId = consistentId; - this.metricsProvider = metricsProvider; if (client) flags |= CLIENT_NODE_MASK; @@ -158,18 +152,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { - if (metricsProvider != null) { - ClusterMetrics metrics0 = metricsProvider.metrics(); - - metrics = metrics0; - - return metrics0; - } - - // TODO: ZK - if (metrics == null) - return new ClusterMetricsSnapshot(); - return metrics; } @@ -182,14 +164,6 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable { /** {@inheritDoc} */ @Override public Map<Integer, CacheMetrics> cacheMetrics() { - if (metricsProvider != null) { - Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics(); - - cacheMetrics = cacheMetrics0; - - return cacheMetrics0; - } - return cacheMetrics; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index e7e2846..74b8a5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -35,6 +35,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; @@ -91,6 +92,9 @@ public class ZookeeperDiscoveryImpl { private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>(); /** */ + private final AliveNodeDataWatcher aliveNodeDataWatcher = new AliveNodeDataWatcher(); + + /** */ private final ZkWatcher watcher; /** */ @@ -585,66 +589,7 @@ public class ZookeeperDiscoveryImpl { String path = zkPaths.aliveNodesDir + "/" + alivePath; if (!path.equals(locNodeZkPath)) - zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataUpdateCallback); - } - - /** */ - private final AliveNodeDataWatcher aliveNodeDataWatcher = new AliveNodeDataWatcher(); - - /** */ - private AliveNodeDataUpdateCallback aliveNodeDataUpdateCallback = new AliveNodeDataUpdateCallback(); - - /** - * - */ - private class AliveNodeDataWatcher implements Watcher { - @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDataChanged) - zkClient.getDataAsync(evt.getPath(), this, aliveNodeDataUpdateCallback); - } - } - - /** - * - */ - private class AliveNodeDataUpdateCallback implements AsyncCallback.DataCallback { - @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - assert crd; - - if (rc == KeeperException.Code.NONODE.intValue()) { - if (log.isDebugEnabled()) - log.debug("Alive node callaback, no node: " + path); - - return; - } - - assert rc == 0 : KeeperException.Code.get(rc); - - try { - if (data.length > 0) { - ZkAliveNodeData nodeData = unmarshal(data); - - Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); - - Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator(); - - boolean processed = false; - - while (it.hasNext()) { - ZkDiscoveryEventData evtData = it.next(); - - if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) - processed = true; - } - - if (processed) - handleProcessedEvents(); - } - } - catch (Throwable e) { - onFatalError(e); - } - } + zkClient.getDataAsync(path, aliveNodeDataWatcher, aliveNodeDataWatcher); } /** @@ -1133,8 +1078,14 @@ public class ZookeeperDiscoveryImpl { List<ZookeeperClusterNode> allNodes = dataForJoined.topology(); - for (ZookeeperClusterNode node : allNodes) + // TODO ZK + for (int i = 0; i < allNodes.size(); i++) { + ZookeeperClusterNode node = allNodes.get(i); + + node.setMetrics(new ClusterMetricsSnapshot()); + top.addNode(node); + } top.addNode(locNode); @@ -1492,4 +1443,52 @@ public class ZookeeperDiscoveryImpl { } } } + + /** + * + */ + private class AliveNodeDataWatcher implements Watcher, AsyncCallback.DataCallback { + @Override public void process(WatchedEvent evt) { + if (evt.getType() == Event.EventType.NodeDataChanged) + zkClient.getDataAsync(evt.getPath(), this, this); + } + + @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + assert crd; + + if (rc == KeeperException.Code.NONODE.intValue()) { + if (log.isDebugEnabled()) + log.debug("Alive node callaback, no node: " + path); + + return; + } + + assert rc == 0 : KeeperException.Code.get(rc); + + try { + if (data.length > 0) { + ZkAliveNodeData nodeData = unmarshal(data); + + Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path); + + Iterator<ZkDiscoveryEventData> it = evtsData.evts.values().iterator(); + + boolean processed = false; + + while (it.hasNext()) { + ZkDiscoveryEventData evtData = it.next(); + + if (evtData.onAckReceived(nodeInternalId, nodeData.lastProcEvt)) + processed = true; + } + + if (processed) + handleProcessedEvents(); + } + } + catch (Throwable e) { + onFatalError(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e447de17/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java new file mode 100644 index 0000000..58b2102 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setMetricsUpdateFrequency(500); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + //IgnitionEx.TEST_ZK = false; + + Ignite srv0 = startGrids(3); + + IgniteCompute c1 = srv0.compute(srv0.cluster().forNodeId(nodeId(1))); + IgniteCompute c2 = srv0.compute(srv0.cluster().forNodeId(nodeId(2))); + + c1.call(new DummyCallable(null)); + + Thread.sleep(3000); + + Ignite srv1 = ignite(0); + + System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getAverageCpuLoad()); + System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getAverageCpuLoad()); + System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getAverageCpuLoad()); + + Thread.sleep(3000); + + System.out.println(srv1.cluster().forNodeId(nodeId(0)).metrics().getTotalExecutedJobs()); + System.out.println(srv1.cluster().forNodeId(nodeId(1)).metrics().getTotalExecutedJobs()); + System.out.println(srv1.cluster().forNodeId(nodeId(2)).metrics().getTotalExecutedJobs()); + } + + private UUID nodeId(int nodeIdx) { + return ignite(nodeIdx).cluster().localNode().id(); + } + + /** + * + */ + private static class DummyCallable implements IgniteCallable<Object> { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + DummyCallable(byte[] data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return data; + } + } +}
