http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 9402a32..38450df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -155,22 +155,30 @@ final class BinaryMetadataTransport { * @param metadata Metadata proposed for update. * @return Future to wait for update result on. */ - GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) throws IgniteCheckedException { + GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) { MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(); if (log.isDebugEnabled()) log.debug("Requesting metadata update for " + metadata.typeId() + "; caller thread is blocked on future " + resFut); - synchronized (this) { - unlabeledFutures.add(resFut); + try { + synchronized (this) { + unlabeledFutures.add(resFut); - if (!stopping) - discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId())); - else - resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); + if (!stopping) + discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId())); + else + resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); + } + } + catch (Exception e) { + resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult(), e); } + if (ctx.clientDisconnected()) + onDisconnected(); + return resFut; } @@ -237,6 +245,8 @@ final class BinaryMetadataTransport { for (MetadataUpdateResultFuture fut : unlabeledFutures) fut.onDone(res); + unlabeledFutures.clear(); + for (MetadataUpdateResultFuture fut : syncMap.values()) fut.onDone(res);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java index 0416746..df64613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java @@ -71,6 +71,11 @@ public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java index f9bd660..84e32e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java @@ -134,6 +134,11 @@ public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessa } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 9b3c1ec..5bbbb31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -598,6 +598,45 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> allOwners() { + lock.readLock().lock(); + + try { + int parts = partitions(); + + List<List<ClusterNode>> res = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) + res.add(new ArrayList<>()); + + List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grpId); + + for (int i = 0; i < allNodes.size(); i++) { + ClusterNode node = allNodes.get(i); + + GridDhtPartitionMap nodeParts = node2part.get(node.id()); + + if (nodeParts != null) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) { + if (e.getValue() == OWNING) { + int part = e.getKey(); + + List<ClusterNode> owners = res.get(part); + + owners.add(node); + } + } + } + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index ba55543..ea99f5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -970,6 +970,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap try { ctx.io().send(nodeId, res, ctx.ioPolicy()); } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send get response to node, node failed: " + nodeId); + } catch (IgniteCheckedException e) { U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + ",req=" + req + ", res=" + res + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 13564c2..7f900cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -237,6 +237,12 @@ public interface GridDhtPartitionTopology { public List<ClusterNode> owners(int p); /** + * @return List indexed by partition number, each list element is collection of all nodes who + * owns corresponding partition. + */ + public List<List<ClusterNode>> allOwners(); + + /** * @param p Partition ID. * @param topVer Topology version. * @return Collection of all nodes who {@code own} this partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 528f0a6..538c57e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1217,6 +1217,45 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> allOwners() { + lock.readLock().lock(); + + try { + int parts = partitions(); + + List<List<ClusterNode>> res = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) + res.add(new ArrayList<>()); + + List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grp.groupId()); + + for (int i = 0; i < allNodes.size(); i++) { + ClusterNode node = allNodes.get(i); + + GridDhtPartitionMap nodeParts = node2part.get(node.id()); + + if (nodeParts != null) { + for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) { + if (e.getValue() == OWNING) { + int part = e.getKey(); + + List<ClusterNode> owners = res.get(part); + + owners.add(node); + } + } + } + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public List<ClusterNode> moving(int p) { if (!grp.rebalanceEnabled()) return ownersAndMoving(p, AffinityTopologyVersion.NONE); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8da91a8..cbb4985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1506,12 +1506,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) - log.debug("Oldest node left during partition exchange [nodeId=" + oldestNode.id() + + log.debug("Coordinator left during partition exchange [nodeId=" + oldestNode.id() + ", exchId=" + exchId + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send local partitions to oldest node (will retry after timeout) [oldestNodeId=" + - oldestNode.id() + ", exchId=" + exchId + ']', e); + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else { + U.error(log, "Failed to send local partitions to coordinator [crd=" + oldestNode.id() + + ", exchId=" + exchId + ']', e); + } } } @@ -3369,9 +3373,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (allReceived) { - awaitSingleMapUpdates(); + cctx.kernalContext().getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + awaitSingleMapUpdates(); - onAllReceived(null); + onAllReceived(null); + } + }); } } else { @@ -3399,7 +3407,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ", newCrd=" + crd0.id() + ']'); } - sendPartitions(crd0); + final ClusterNode newCrd = crd0; + + cctx.kernalContext().getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + sendPartitions(newCrd); + } + }); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java index d7dfa16..bbbd999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateFinishMessage.java @@ -94,6 +94,11 @@ public class ChangeGlobalStateFinishMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index 50fc022..81855fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -131,6 +131,11 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { return mgr.createDiscoCacheOnCacheChange(topVer, discoCache); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java new file mode 100644 index 0000000..5c3044b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterMetricsUpdateMessage.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ClusterMetricsUpdateMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private byte[] nodeMetrics; + + /** */ + @GridDirectMap(keyType = UUID.class, valueType = byte[].class) + private Map<UUID, byte[]> allNodesMetrics; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public ClusterMetricsUpdateMessage() { + // No-op. + } + + /** + * @param nodeMetrics Node metrics. + */ + ClusterMetricsUpdateMessage(byte[] nodeMetrics) { + this.nodeMetrics = nodeMetrics; + } + + /** + * @param allNodesMetrics All nodes metrcis. + */ + ClusterMetricsUpdateMessage(Map<UUID, byte[]> allNodesMetrics) { + this.allNodesMetrics = allNodesMetrics; + } + + /** + * @return Node metrics. + */ + @Nullable byte[] nodeMetrics() { + return nodeMetrics; + } + + /** + * @return All nodes metrics. + */ + @Nullable Map<UUID, byte[]> allNodesMetrics() { + return allNodesMetrics; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMap("allNodesMetrics", allNodesMetrics, MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("nodeMetrics", nodeMetrics)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + allNodesMetrics = reader.readMap("allNodesMetrics", MessageCollectionItemType.UUID, MessageCollectionItemType.BYTE_ARR, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + nodeMetrics = reader.readByteArray("nodeMetrics"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(ClusterMetricsUpdateMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 133; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterMetricsUpdateMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java new file mode 100644 index 0000000..22a385f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterNodeMetrics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.ClusterMetricsSnapshot; + +/** + * + */ +class ClusterNodeMetrics implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final byte[] metrics; + + /** */ + private final Map<Integer, CacheMetrics> cacheMetrics; + + /** + * @param metrics Metrics. + * @param cacheMetrics Cache metrics. + */ + ClusterNodeMetrics(ClusterMetrics metrics, Map<Integer, CacheMetrics> cacheMetrics) { + this.metrics = ClusterMetricsSnapshot.serialize(metrics); + this.cacheMetrics = cacheMetrics; + } + + /** + * @return Metrics. + */ + byte[] metrics() { + return metrics; + } + + /** + * @return Cache metrics. + */ + Map<Integer, CacheMetrics> cacheMetrics() { + return cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 5f2c66c..8796302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -33,6 +33,8 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDiagnosticInfo; import org.apache.ignite.internal.IgniteDiagnosticMessage; @@ -42,21 +44,29 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.IgniteClusterImpl; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridTimerTask; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; +import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED; @@ -66,6 +76,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC; import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC; +import static org.apache.ignite.internal.GridTopic.TOPIC_METRICS; import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR; /** @@ -102,6 +113,18 @@ public class ClusterProcessor extends GridProcessorAdapter { /** */ private final AtomicLong diagFutId = new AtomicLong(); + /** */ + private final Map<UUID, byte[]> allNodesMetrics = new ConcurrentHashMap<>(); + + /** */ + private final JdkMarshaller marsh = new JdkMarshaller(); + + /** */ + private DiscoveryMetricsProvider metricsProvider; + + /** */ + private boolean sndMetrics; + /** * @param ctx Kernal context. */ @@ -111,6 +134,8 @@ public class ClusterProcessor extends GridProcessorAdapter { notifyEnabled.set(IgniteSystemProperties.getBoolean(IGNITE_UPDATE_NOTIFIER, true)); cluster = new IgniteClusterImpl(ctx); + + sndMetrics = !(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi); } /** @@ -120,33 +145,31 @@ public class ClusterProcessor extends GridProcessorAdapter { return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true); } - /** */ - private final JdkMarshaller marsh = new JdkMarshaller(); - /** * @throws IgniteCheckedException If failed. */ public void initDiagnosticListeners() throws IgniteCheckedException { ctx.event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - UUID nodeId = discoEvt.eventNode().id(); + UUID nodeId = discoEvt.eventNode().id(); - ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); + ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); - if (futs != null) { - for (InternalDiagnosticFuture fut : futs.values()) { - if (fut.nodeId.equals(nodeId)) - fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId)); - } + if (futs != null) { + for (InternalDiagnosticFuture fut : futs.values()) { + if (fut.nodeId.equals(nodeId)) + fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId)); } } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT); + + allNodesMetrics.remove(nodeId); + } + }, EVT_NODE_FAILED, EVT_NODE_LEFT); ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object msg, byte plc) { @@ -233,6 +256,17 @@ public class ClusterProcessor extends GridProcessorAdapter { U.warn(diagnosticLog, "Received unexpected message: " + msg); } }); + + if (sndMetrics) { + ctx.io().addMessageListener(TOPIC_METRICS, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof ClusterMetricsUpdateMessage) + processMetricsUpdateMessage(nodeId, (ClusterMetricsUpdateMessage)msg); + else + U.warn(log, "Received unexpected message for TOPIC_METRICS: " + msg); + } + }); + } } /** @@ -296,7 +330,6 @@ public class ClusterProcessor extends GridProcessorAdapter { } } - /** * @param vals collection to seek through. */ @@ -334,6 +367,14 @@ public class ClusterProcessor extends GridProcessorAdapter { log.debug("Failed to create GridUpdateNotifier: " + e); } } + + if (sndMetrics) { + metricsProvider = ctx.discovery().createMetricsProvider(); + + long updateFreq = ctx.config().getMetricsUpdateFrequency(); + + ctx.timeout().addTimeoutObject(new MetricsUpdateTimeoutObject(updateFreq)); + } } /** {@inheritDoc} */ @@ -352,6 +393,133 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * @param sndNodeId Sender node ID. + * @param msg Message. + */ + private void processMetricsUpdateMessage(UUID sndNodeId, ClusterMetricsUpdateMessage msg) { + byte[] nodeMetrics = msg.nodeMetrics(); + + if (nodeMetrics != null) { + assert msg.allNodesMetrics() == null; + + allNodesMetrics.put(sndNodeId, nodeMetrics); + + updateNodeMetrics(ctx.discovery().discoCache(), sndNodeId, nodeMetrics); + } + else { + Map<UUID, byte[]> allNodesMetrics = msg.allNodesMetrics(); + + assert allNodesMetrics != null; + + DiscoCache discoCache = ctx.discovery().discoCache(); + + for (Map.Entry<UUID, byte[]> e : allNodesMetrics.entrySet()) { + if (!ctx.localNodeId().equals(e.getKey())) + updateNodeMetrics(discoCache, e.getKey(), e.getValue()); + } + } + } + + /** + * @param discoCache Discovery data cache. + * @param nodeId Node ID. + * @param metricsBytes Marshalled metrics. + */ + private void updateNodeMetrics(DiscoCache discoCache, UUID nodeId, byte[] metricsBytes) { + ClusterNode node = discoCache.node(nodeId); + + if (node == null || !discoCache.alive(nodeId)) + return; + + try { + ClusterNodeMetrics metrics = U.unmarshalZip(ctx.config().getMarshaller(), metricsBytes, null); + + assert node instanceof IgniteClusterNode : node; + + IgniteClusterNode node0 = (IgniteClusterNode)node; + + node0.setMetrics(ClusterMetricsSnapshot.deserialize(metrics.metrics(), 0)); + node0.setCacheMetrics(metrics.cacheMetrics()); + + ctx.discovery().metricsUpdateEvent(discoCache, node0); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to unmarshal node metrics: " + e); + } + } + + /** + * + */ + private void updateMetrics() { + if (ctx.isStopping() || ctx.clientDisconnected()) + return; + + ClusterNode oldest = ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); + + if (oldest == null) + return; + + if (ctx.localNodeId().equals(oldest.id())) { + IgniteClusterNode locNode = (IgniteClusterNode)ctx.discovery().localNode(); + + locNode.setMetrics(metricsProvider.metrics()); + locNode.setCacheMetrics(metricsProvider.cacheMetrics()); + + ClusterNodeMetrics metrics = new ClusterNodeMetrics(locNode.metrics(), locNode.cacheMetrics()); + + try { + byte[] metricsBytes = U.zip(U.marshal(ctx.config().getMarshaller(), metrics)); + + allNodesMetrics.put(ctx.localNodeId(), metricsBytes); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to marshal local node metrics: " + e, e); + } + + ctx.discovery().metricsUpdateEvent(ctx.discovery().discoCache(), locNode); + + Collection<ClusterNode> allNodes = ctx.discovery().allNodes(); + + ClusterMetricsUpdateMessage msg = new ClusterMetricsUpdateMessage(new HashMap<>(allNodesMetrics)); + + for (ClusterNode node : allNodes) { + if (ctx.localNodeId().equals(node.id()) || !ctx.discovery().alive(node.id())) + continue; + + try { + ctx.io().sendToGridTopic(node, TOPIC_METRICS, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send metrics update, node failed: " + e); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send metrics update: " + e, e); + } + } + } + else { + ClusterNodeMetrics metrics = new ClusterNodeMetrics(metricsProvider.metrics(), metricsProvider.cacheMetrics()); + + try { + byte[] metricsBytes = U.zip(U.marshal(ctx.config().getMarshaller(), metrics)); + + ClusterMetricsUpdateMessage msg = new ClusterMetricsUpdateMessage(metricsBytes); + + ctx.io().sendToGridTopic(oldest, TOPIC_METRICS, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send metrics update to oldest, node failed: " + e); + } + catch (IgniteCheckedException e) { + LT.warn(log, e, "Failed to send metrics update to oldest: " + e, false, false); + } + } + } + + /** * Disables update notifier. */ public void disableUpdateNotifier() { @@ -571,4 +739,51 @@ public class ClusterProcessor extends GridProcessorAdapter { return S.toString(InternalDiagnosticFuture.class, this); } } + + /** + * + */ + private class MetricsUpdateTimeoutObject implements GridTimeoutObject, Runnable { + /** */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** */ + private long endTime; + + /** */ + private final long timeout; + + /** + * @param timeout Timeout. + */ + MetricsUpdateTimeoutObject(long timeout) { + this.timeout = timeout; + + endTime = U.currentTimeMillis() + timeout; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return id; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void run() { + updateMetrics(); + + endTime = U.currentTimeMillis() + timeout; + + ctx.timeout().addTimeoutObject(this); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + ctx.getSystemExecutorService().execute(this); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index e9754d1..928c619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -63,6 +63,11 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java new file mode 100644 index 0000000..fc0f181 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +class ContinuousRoutineInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + UUID srcNodeId; + + /** */ + final UUID routineId; + + /** */ + final byte[] hnd; + + /** */ + final byte[] nodeFilter; + + /** */ + final int bufSize; + + /** */ + final long interval; + + /** */ + final boolean autoUnsubscribe; + + /** */ + transient boolean disconnected; + + /** + * @param srcNodeId Source node ID. + * @param routineId Routine ID. + * @param hnd Marshalled handler. + * @param nodeFilter Marshalled node filter. + * @param bufSize Handler buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + */ + ContinuousRoutineInfo( + UUID srcNodeId, + UUID routineId, + byte[] hnd, + byte[] nodeFilter, + int bufSize, + long interval, + boolean autoUnsubscribe) + { + this.srcNodeId = srcNodeId; + this.routineId = routineId; + this.hnd = hnd; + this.nodeFilter = nodeFilter; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @param srcNodeId Source node ID. + */ + void sourceNodeId(UUID srcNodeId) { + this.srcNodeId = srcNodeId; + } + + /** + * + */ + void onDisconnected() { + disconnected = true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ContinuousRoutineInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java new file mode 100644 index 0000000..581ac60 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineStartResultMessage.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ContinuousRoutineStartResultMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int ERROR_FLAG = 0x01; + + /** */ + private UUID routineId; + + /** */ + private byte[] errBytes; + + /** */ + private byte[] cntrsMapBytes; + + /** */ + private int flags; + + /** + * + */ + public ContinuousRoutineStartResultMessage() { + // No-op. + } + + /** + * @param routineId Routine ID. + * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param errBytes Error bytes. + * @param err {@code True} if failed to start routine. + */ + ContinuousRoutineStartResultMessage(UUID routineId, byte[] cntrsMapBytes, byte[] errBytes, boolean err) { + this.routineId = routineId; + this.cntrsMapBytes = cntrsMapBytes; + this.errBytes = errBytes; + + if (err) + flags |= ERROR_FLAG; + } + + /** + * @return Marshalled {@link CachePartitionPartialCountersMap}. + */ + @Nullable byte[] countersMapBytes() { + return cntrsMapBytes; + } + + /** + * @return {@code True} if failed to start routine. + */ + boolean error() { + return (flags & ERROR_FLAG) != 0; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Error bytes. + */ + @Nullable byte[] errorBytes() { + return errBytes; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("cntrsMapBytes", cntrsMapBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("flags", flags)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntrsMapBytes = reader.readByteArray("cntrsMapBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + flags = reader.readInt("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(ContinuousRoutineStartResultMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 134; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ContinuousRoutineStartResultMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java new file mode 100644 index 0000000..d29de89 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.List; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class ContinuousRoutinesCommonDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final List<ContinuousRoutineInfo> startedRoutines; + + /** + * @param startedRoutines Routines started in cluster. + */ + ContinuousRoutinesCommonDiscoveryData(List<ContinuousRoutineInfo> startedRoutines) { + this.startedRoutines = startedRoutines; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ContinuousRoutinesCommonDiscoveryData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java new file mode 100644 index 0000000..ad24ff1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; + +import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; + +/** + * + */ +class ContinuousRoutinesInfo { + /** */ + private final Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>(); + + /** + * @param dataBag Discovery data bag. + */ + void collectGridNodeData(DiscoveryDataBag dataBag) { + synchronized (startedRoutines) { + if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) + dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); + } + } + + /** + * @param dataBag Discovery data bag. + */ + void collectJoiningNodeData(DiscoveryDataBag dataBag) { + synchronized (startedRoutines) { + for (ContinuousRoutineInfo info : startedRoutines.values()) { + if (info.disconnected) + info.sourceNodeId(dataBag.joiningNodeId()); + } + + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); + } + } + + /** + * @param info Routine info. + */ + void addRoutineInfo(ContinuousRoutineInfo info) { + synchronized (startedRoutines) { + startedRoutines.put(info.routineId, info); + } + } + + /** + * @param routineId Routine ID. + * @return {@code True} if routine exists. + */ + boolean routineExists(UUID routineId) { + synchronized (startedRoutines) { + return startedRoutines.containsKey(routineId); + } + } + + /** + * @param routineId Routine ID. + */ + void removeRoutine(UUID routineId) { + synchronized (startedRoutines) { + startedRoutines.remove(routineId); + } + } + + /** + * @param locRoutines Routines IDs which can survive reconnect. + */ + void onClientDisconnected(Collection<UUID> locRoutines) { + synchronized (startedRoutines) { + for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { + Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); + + ContinuousRoutineInfo info = e.getValue(); + + if (!locRoutines.contains(info.routineId)) + it.remove(); + else + info.onDisconnected(); + } + } + } + + /** + * Removes all routines with autoUnsubscribe=false started by given node. + * + * @param nodeId Node ID. + */ + void onNodeFail(UUID nodeId) { + synchronized (startedRoutines) { + for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator(); it.hasNext();) { + Map.Entry<UUID, ContinuousRoutineInfo> e = it.next(); + + ContinuousRoutineInfo info = e.getValue(); + + if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId)) + it.remove(); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ContinuousRoutinesInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java new file mode 100644 index 0000000..9be6ef8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.List; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class ContinuousRoutinesJoiningNodeDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final List<ContinuousRoutineInfo> startedRoutines; + + /** + * @param startedRoutines Routines registered on nodes, to be started in cluster. + */ + ContinuousRoutinesJoiningNodeDiscoveryData(List<ContinuousRoutineInfo> startedRoutines) { + this.startedRoutines = startedRoutines; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ContinuousRoutinesJoiningNodeDiscoveryData.class, this); + } +}