This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b2089a6 IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831. b2089a6 is described below commit b2089a6be789fbc8e4fc86e4125872e6456bb3d0 Author: ibessonov <bessonov...@gmail.com> AuthorDate: Mon Jun 1 10:50:32 2020 +0300 IGNITE-13050 Added nodes cache in ClusterGroupAdapter - Fixes #7831. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../internal/cluster/ClusterGroupAdapter.java | 220 +++++++++++++-------- .../ignite/internal/ClusterGroupSelfTest.java | 118 +++++++---- 2 files changed, 215 insertions(+), 123 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index 3d4b71f..4cea339 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -61,6 +61,9 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.unmodifiableCollection; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; /** @@ -97,6 +100,9 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** Node IDs. */ private Set<UUID> ids; + /** */ + private transient volatile ClusterGroupState state; + /** * Required by {@link Externalizable}. */ @@ -285,43 +291,45 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes() { - guard(); + return unmodifiableCollection(ensureLastTopologyState().nodes); + } - try { - if (ids != null) { - if (ids.isEmpty()) - return Collections.emptyList(); - else if (ids.size() == 1) { - ClusterNode node = ctx.discovery().node(F.first(ids)); + /** */ + protected Collection<ClusterNode> resolveCurrentNodes() { + assert Thread.holdsLock(this); - return node != null ? Collections.singleton(node) : Collections.<ClusterNode>emptyList(); - } - else { - Collection<ClusterNode> nodes = new ArrayList<>(ids.size()); + if (ids != null) { + if (ids.isEmpty()) + return Collections.emptyList(); + else if (ids.size() == 1) { + ClusterNode node = ctx.discovery().node(F.first(ids)); - for (UUID id : ids) { - ClusterNode node = ctx.discovery().node(id); + return node != null ? singleton(node) : emptySet(); + } + else { + ArrayList<ClusterNode> nodes = new ArrayList<>(ids.size()); - if (node != null) - nodes.add(node); - } + for (UUID id : ids) { + ClusterNode node = ctx.discovery().node(id); - return nodes; + if (node != null) + nodes.add(node); } - } - else { - Collection<ClusterNode> all; - if (p instanceof DaemonFilter) - all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes()); - else - all = ctx.discovery().allNodes(); + nodes.trimToSize(); - return p != null ? F.view(all, p) : all; + return nodes; } } - finally { - unguard(); + else { + Collection<ClusterNode> all; + + if (p instanceof DaemonFilter) + all = F.concat(false, ctx.discovery().daemonNodes(), ctx.discovery().allNodes()); + else + all = ctx.discovery().allNodes(); + + return p != null ? F.view(all, p) : all; } } @@ -412,7 +420,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { Set<UUID> nodeIds; if (F.isEmpty(nodes)) - nodeIds = contains(node) ? Collections.singleton(node.id()) : Collections.<UUID>emptySet(); + nodeIds = contains(node) ? singleton(node.id()) : emptySet(); else { nodeIds = U.newHashSet(nodes.length + 1); @@ -461,7 +469,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { Set<UUID> nodeIds; if (F.isEmpty(ids)) - nodeIds = contains(id) ? Collections.singleton(id) : Collections.<UUID>emptySet(); + nodeIds = contains(id) ? singleton(id) : emptySet(); else { nodeIds = U.newHashSet(ids.length + 1); @@ -538,7 +546,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** {@inheritDoc} */ @Override public final ClusterGroup forRemotes() { - return forOthers(Collections.singleton(ctx.localNodeId())); + return forOthers(singleton(ctx.localNodeId())); } /** @@ -709,6 +717,46 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { } } + /** */ + protected final ClusterGroupState ensureLastTopologyState() { + ClusterGroupState state = this.state; + + GridDiscoveryManager discoMgr = ctx.discovery(); + + long lastTopVer = discoMgr.topologyVersion(); + long startTime = discoMgr.gridStartTime(); + + if (state == null || state.lastTopVer < lastTopVer || state.startTime != startTime) + return resetState(); + + return state; + } + + /** */ + protected synchronized ClusterGroupState resetState() { + guard(); + + try { + ClusterGroupState state = this.state; + + GridDiscoveryManager discoMgr = ctx.discovery(); + + long lastTopVer = discoMgr.topologyVersion(); + long startTime = discoMgr.gridStartTime(); + + // Double check in synchronized context. + if (state != null && state.lastTopVer == lastTopVer && state.startTime == startTime) + return state; + + Collection<ClusterNode> nodes = resolveCurrentNodes(); + + return this.state = new ClusterGroupState(nodes, lastTopVer, startTime); + } + finally { + unguard(); + } + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, igniteInstanceName); @@ -752,6 +800,37 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { } /** + * Container for cluster group state. + */ + private static class ClusterGroupState { + /** Calculated nodes. */ + public final Collection<ClusterNode> nodes; + + /** Last topology version. */ + public final long lastTopVer; + + /** + * Start time of first node in grid. Required for cases like in + * {@code GridServiceProxyClientReconnectSelfTest#testClientReconnect()} test. In that scenario we have one + * server and one client. Topology version is {@code 2} and after server restart and client reconnect we have + * basically new server but with the same topology version. This situation can be caught if we have additional + * counter. + */ + public final long startTime; + + /** + * @param nodes Calculated nodes. + * @param lastTopVer Last topology version. + * @param startTime Start time of first node in grid. + */ + public ClusterGroupState(Collection<ClusterNode> nodes, long lastTopVer, long startTime) { + this.nodes = nodes; + this.lastTopVer = lastTopVer; + this.startTime = startTime; + } + } + + /** */ private static class CachesFilter implements IgnitePredicate<ClusterNode> { /** */ @@ -907,7 +986,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { private boolean isOldest; /** State. */ - private volatile AgeClusterGroupState state; + private volatile IgnitePredicate<ClusterNode> ageP; /** * Required for {@link Externalizable}. @@ -924,54 +1003,48 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { super(parent.ctx, parent.subjId, parent.p, parent.ids); this.isOldest = isOldest; - - reset(); } - /** - * Resets node. - */ - private synchronized void reset() { - guard(); - - try { - long lastTopVer = ctx.discovery().topologyVersion(); + /** {@inheritDoc} */ + @Override protected Set<ClusterNode> resolveCurrentNodes() { + Collection<ClusterNode> nodes = super.resolveCurrentNodes(); - ClusterNode node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); + ClusterNode node = isOldest ? U.oldest(nodes, null) : U.youngest(nodes, null); - IgnitePredicate<ClusterNode> p = F.nodeForNodes(node); + if (node == null) { + ageP = F.alwaysFalse(); - state = new AgeClusterGroupState(node, p, lastTopVer); - } - finally { - unguard(); + return emptySet(); } - } - - /** {@inheritDoc} */ - @Override public ClusterNode node() { - if (ctx.discovery().topologyVersion() != state.lastTopVer) - reset(); + else { + ageP = F.nodeForNodes(node); - return state.node; + return singleton(node); + } } /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes() { - if (ctx.discovery().topologyVersion() != state.lastTopVer) - reset(); + guard(); + + try { + ClusterNode node = F.first(ensureLastTopologyState().nodes); - ClusterNode node = state.node; + if (node != null) + node = ctx.discovery().node(node.id()); - return node == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(node); + return node == null ? emptySet() : singleton(node); + } + finally { + unguard(); + } } /** {@inheritDoc} */ @Override public IgnitePredicate<ClusterNode> predicate() { - if (ctx.discovery().topologyVersion() != state.lastTopVer) - reset(); + ensureLastTopologyState(); - return state.p; + return ageP; } /** {@inheritDoc} */ @@ -1022,31 +1095,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { } /** - * Container for age-based cluster group state. - */ - private static class AgeClusterGroupState { - /** Selected node. */ - private final ClusterNode node; - - /** Node predicate. */ - private final IgnitePredicate<ClusterNode> p; - - /** Last topology version. */ - private final long lastTopVer; - - /** - * @param node Node. - * @param p Predicate. - * @param lastTopVer Last topology version. - */ - public AgeClusterGroupState(ClusterNode node, IgnitePredicate<ClusterNode> p, long lastTopVer) { - this.node = node; - this.p = p; - this.lastTopVer = lastTopVer; - } - } - - /** * Dynamic cluster group based predicate. */ private static class GroupPredicate implements IgnitePredicate<ClusterNode> { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java index 534cc3a..bfb6453 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -17,19 +17,21 @@ package org.apache.ignite.internal; -import java.util.Collection; -import java.util.LinkedList; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCluster; import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.Marshaller; @@ -37,6 +39,8 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonTest; import org.junit.Test; +import static java.util.Collections.singleton; + /** * Test for {@link ClusterGroup}. */ @@ -46,7 +50,7 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { private static final int NODES_CNT = 4; /** Projection node IDs. */ - private static Collection<UUID> ids; + private static List<UUID> ids; /** */ private static Ignite ignite; @@ -55,20 +59,11 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { @Override protected void beforeTestsStarted() throws Exception { assert NODES_CNT > 2; - ids = new LinkedList<>(); - for (int i = 0; i < NODES_CNT; i++) { - Ignite g; - if (i > 1) - g = startClientGrid(i); + startClientGrid(i); else - g = startGrid(i); - - ids.add(g.cluster().localNode().id()); - - if (i == 0) - ignite = g; + startGrid(i); } waitForTopology(NODES_CNT); @@ -82,6 +77,19 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + ignite = grid(0); + + ids = G.allGrids().stream() + .map(Ignite::cluster) + .map(IgniteCluster::localNode) + .map(ClusterNode::id) + .collect(Collectors.toList()); + } + + /** {@inheritDoc} */ @Override protected ClusterGroup projection() { return grid(0).cluster().forPredicate(F.nodeForNodeIds(ids)); } @@ -100,27 +108,39 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { } /** - * @throws Exception If failed. */ @Test public void testOldest() throws Exception { - ClusterGroup oldest = ignite.cluster().forOldest(); + IgniteCluster cluster = grid(1).cluster(); - ClusterNode node = null; + ClusterGroup oldest = cluster.forOldest(); - long minOrder = Long.MAX_VALUE; + ClusterNode oldestNode = grid(0).localNode(); - for (ClusterNode n : ignite.cluster().nodes()) { - if (n.order() < minOrder) { - node = n; + assertEquals(cluster.forNode(oldestNode).node(), oldest.node()); - minOrder = n.order(); - } - } + assertEqualsCollections( + singleton(cluster.forNode(oldestNode).node()), + cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet()) + ); - assertEquals(oldest.node(), ignite.cluster().forNode(node).node()); + stopGrid(0); - ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); + try { + ClusterNode newOldestNode = grid(1).localNode(); + + assertEquals(cluster.forNode(newOldestNode).node(), oldest.node()); + + assertEqualsCollections( + singleton(cluster.forNode(newOldestNode).node()), + cluster.nodes().stream().filter(oldest.predicate()::apply).collect(Collectors.toSet()) + ); + } + finally { + startGrid(0); + } + + ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val"); assertEquals(0, emptyGrp.forOldest().nodes().size()); } @@ -130,25 +150,49 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { */ @Test public void testYoungest() throws Exception { - ClusterGroup youngest = ignite.cluster().forYoungest(); + IgniteCluster cluster = ignite.cluster(); - ClusterNode node = null; + ClusterGroup youngest = cluster.forYoungest(); - long maxOrder = Long.MIN_VALUE; + ClusterNode youngestNode = grid(NODES_CNT - 1).localNode(); - for (ClusterNode n : ignite.cluster().nodes()) { - if (n.order() > maxOrder) { - node = n; + assertEquals(cluster.forNode(youngestNode).node(), youngest.node()); - maxOrder = n.order(); - } - } + assertEqualsCollections( + singleton(cluster.forNode(youngestNode).node()), + cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet()) + ); - assertEquals(youngest.node(), ignite.cluster().forNode(node).node()); + stopGrid(NODES_CNT - 1); - ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val"); + try { + ClusterNode newYoungestNode = grid(NODES_CNT - 2).localNode(); + + assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node()); + + assertEqualsCollections( + singleton(cluster.forNode(newYoungestNode).node()), + cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet()) + ); + } + finally { + startClientGrid(NODES_CNT - 1); + } + + ClusterGroup emptyGrp = cluster.forAttribute("nonExistent", "val"); assertEquals(0, emptyGrp.forYoungest().nodes().size()); + + try (Ignite ignore = startGrid(NODES_CNT)) { + ClusterNode newYoungestNode = grid(NODES_CNT).localNode(); + + assertEquals(cluster.forNode(newYoungestNode).node(), youngest.node()); + + assertEqualsCollections( + singleton(cluster.forNode(newYoungestNode).node()), + cluster.nodes().stream().filter(youngest.predicate()::apply).collect(Collectors.toSet()) + ); + } } /**