This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 68effd6 IGNITE-13766 API for network connectivity check - Fixes #8500. 68effd6 is described below commit 68effd6f4eaed9945a3781b1bb6f297e04b9bd84 Author: Semyon Danilov <samvi...@yandex.ru> AuthorDate: Tue Dec 22 15:49:25 2020 +0300 IGNITE-13766 API for network connectivity check - Fixes #8500. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../diagnostic/ConnectivityCommand.java | 194 +++++++++++++++++++++ .../commandline/diagnostic/DiagnosticCommand.java | 3 + .../diagnostic/DiagnosticSubCommand.java | 5 +- .../apache/ignite/util/GridCommandHandlerTest.java | 146 ++++++++++++++++ .../availability/VisorConnectivityArgs.java | 68 ++++++++ .../availability/VisorConnectivityResult.java | 69 ++++++++ .../availability/VisorConnectivityTask.java | 125 +++++++++++++ .../main/resources/META-INF/classnames.properties | 5 + 8 files changed, 614 insertions(+), 1 deletion(-) diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java new file mode 100644 index 0000000..99ca0cc --- /dev/null +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.commandline.diagnostic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientNode; +import org.apache.ignite.internal.commandline.Command; +import org.apache.ignite.internal.commandline.TaskExecutor; +import org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityArgs; +import org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityResult; +import org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask; + +import static org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME; +import static org.apache.ignite.internal.commandline.CommandList.DIAGNOSTIC; +import static org.apache.ignite.internal.commandline.CommandLogger.join; +import static org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.CONNECTIVITY; + +/** + * Command to check connectivity between every node. + */ +public class ConnectivityCommand implements Command<Void> { + /** + * Header of output table. + */ + private final List<String> TABLE_HEADER = Arrays.asList( + "SOURCE-NODE-ID", + "SOURCE-CONSISTENT-ID", + "SOURCE-NODE-TYPE", + "DESTINATION-NODE-ID", + "DESTINATION_CONSISTENT_ID", + "DESTINATION-NODE-TYPE" + ); + + /** + * Client node type string. + */ + private final String NODE_TYPE_CLIENT = "CLIENT"; + + /** + * Server node type string. + */ + private final String NODE_TYPE_SERVER = "SERVER"; + + /** + * Logger + */ + private Logger logger; + + /** {@inheritDoc} */ + @Override public Object execute(GridClientConfiguration clientCfg, Logger logger) throws Exception { + this.logger = logger; + + Map<ClusterNode, VisorConnectivityResult> result; + + try (GridClient client = Command.startClient(clientCfg)) { + Set<UUID> nodeIds = client.compute().nodes().stream().map(GridClientNode::nodeId).collect(Collectors.toSet()); + + VisorConnectivityArgs taskArg = new VisorConnectivityArgs(nodeIds); + + result = TaskExecutor.executeTask( + client, + VisorConnectivityTask.class, + taskArg, + clientCfg + ); + } + + printResult(result); + + return result; + } + + /** + * @param res Result. + */ + private void printResult(Map<ClusterNode, VisorConnectivityResult> res) { + final boolean[] hasFailed = {false}; + + final List<List<String>> table = new ArrayList<>(); + + table.add(TABLE_HEADER); + + for (Map.Entry<ClusterNode, VisorConnectivityResult> entry : res.entrySet()) { + ClusterNode key = entry.getKey(); + + String id = key.id().toString(); + String consId = key.consistentId().toString(); + String isClient = key.isClient() ? NODE_TYPE_CLIENT : NODE_TYPE_SERVER; + + VisorConnectivityResult value = entry.getValue(); + + Map<ClusterNode, Boolean> statuses = value.getNodeIds(); + + List<List<String>> row = statuses.entrySet().stream().map(nodeStat -> { + ClusterNode remoteNode = nodeStat.getKey(); + + String remoteId = remoteNode.id().toString(); + String remoteConsId = remoteNode.consistentId().toString(); + String nodeType = remoteNode.isClient() ? NODE_TYPE_CLIENT : NODE_TYPE_SERVER; + + Boolean status = nodeStat.getValue(); + + if (!status) { + hasFailed[0] = true; + return Arrays.asList(id, consId, isClient, remoteId, remoteConsId, nodeType); + } + + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + table.addAll(row); + } + + if (hasFailed[0]) + logger.info("There is no connectivity between the following nodes:\n" + formatAsTable(table)); + else + logger.info("There are no connectivity problems."); + } + + /** + * Format output as a table + * @param rows table rows. + * @return formatted string. + */ + public static String formatAsTable(List<List<String>> rows) { + int[] maxLengths = new int[rows.get(0).size()]; + + for (List<String> row : rows) { + for (int i = 0; i < row.size(); i++) + maxLengths[i] = Math.max(maxLengths[i], row.get(i).length()); + } + + StringBuilder formatBuilder = new StringBuilder(); + + for (int maxLength : maxLengths) + formatBuilder.append("%-").append(maxLength + 2).append("s"); + + String format = formatBuilder.toString(); + + StringBuilder result = new StringBuilder(); + + for (List<String> row : rows) + result.append(String.format(format, row.toArray(new String[0]))).append("\n"); + + return result.toString(); + } + + /** {@inheritDoc} */ + @Override public Void arg() { + return null; + } + + /** {@inheritDoc} */ + @Override public void printUsage(Logger logger) { + logger.info("View connectvity state of all nodes in cluster"); + logger.info(join(" ", + UTILITY_NAME, DIAGNOSTIC, CONNECTIVITY, + "// Prints info about connectivity between nodes")); + logger.info(""); + } + + /** {@inheritDoc} */ + @Override public String name() { + return CONNECTIVITY.name(); + } +} diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java index c0e59a3..60534c4 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java @@ -29,6 +29,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME import static org.apache.ignite.internal.commandline.CommandList.DIAGNOSTIC; import static org.apache.ignite.internal.commandline.CommandLogger.INDENT; import static org.apache.ignite.internal.commandline.CommandLogger.join; +import static org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.CONNECTIVITY; import static org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.HELP; import static org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.PAGE_LOCKS; @@ -77,6 +78,7 @@ public class DiagnosticCommand extends AbstractCommand<DiagnosticSubCommand> { switch (cmd) { case HELP: + case CONNECTIVITY: break; case PAGE_LOCKS: @@ -109,6 +111,7 @@ public class DiagnosticCommand extends AbstractCommand<DiagnosticSubCommand> { */ private void printDiagnosticHelp(Logger logger) { logger.info(INDENT + join(" ", UTILITY_NAME, DIAGNOSTIC, PAGE_LOCKS + " - dump page locks info.")); + logger.info(INDENT + join(" ", UTILITY_NAME, DIAGNOSTIC, CONNECTIVITY + " - show connectivity state.")); logger.info(INDENT + "Subcommands:"); diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java index 2229fde..aac7ddb 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java @@ -27,7 +27,10 @@ public enum DiagnosticSubCommand { HELP("help", null), /** */ - PAGE_LOCKS("pageLocks", new PageLocksCommand()); + PAGE_LOCKS("pageLocks", new PageLocksCommand()), + + /** */ + CONNECTIVITY("connectivity", new ConnectivityCommand()); /** Diagnostic command name. */ private final String name; diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index ec1508d..4a2bfe1 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpConfigura import org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpStrategy; import org.apache.ignite.internal.processors.cache.warmup.WarmUpTestPluginProvider; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.G; @@ -111,10 +113,12 @@ import org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersist import org.apache.ignite.internal.visor.tx.VisorTxInfo; import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -1164,6 +1168,148 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb } /** + * Test connectivity command works via control.sh. + */ + @Test + public void testConnectivityCommandWithoutFailedNodes() throws Exception { + IgniteEx ignite = startGrids(5); + + assertFalse(ignite.cluster().state().active()); + + ignite.cluster().state(ACTIVE); + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--diagnostic", "connectivity")); + + assertContains(log, testOut.toString(), "There are no connectivity problems."); + } + + /** + * Test that if node exits topology during connectivity check, the command will not fail. + * + * Description: + * 1. Start three nodes. + * 2. Execute connectivity check. + * 3. When 3-rd node receives connectivity check compute task, it must stop itself. + * 4. The command should exit with code OK. + * + * @throws Exception If failed. + */ + @Test + public void testConnectivityCommandWithNodeExit() throws Exception { + IgniteEx[] node3 = new IgniteEx[1]; + + class KillNode3CommunicationSpi extends TcpCommunicationSpi { + /** Fail check connection request and stop third node */ + boolean fail; + + public KillNode3CommunicationSpi(boolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { + if (fail) { + runAsync(node3[0]::close); + return null; + } + + return super.checkConnection(nodes); + } + } + + IgniteEx node1 = startGrid(1, (UnaryOperator<IgniteConfiguration>) configuration -> { + configuration.setCommunicationSpi(new KillNode3CommunicationSpi(false)); + return configuration; + }); + + IgniteEx node2 = startGrid(2, (UnaryOperator<IgniteConfiguration>) configuration -> { + configuration.setCommunicationSpi(new KillNode3CommunicationSpi(false)); + return configuration; + }); + + node3[0] = startGrid(3, (UnaryOperator<IgniteConfiguration>) configuration -> { + configuration.setCommunicationSpi(new KillNode3CommunicationSpi(true)); + return configuration; + }); + + assertFalse(node1.cluster().state().active()); + + node1.cluster().state(ACTIVE); + + assertEquals(3, node1.cluster().nodes().size()); + + injectTestSystemOut(); + + final IgniteInternalFuture<?> connectivity = runAsync(() -> { + final int result = execute("--diagnostic", "connectivity"); + assertEquals(EXIT_CODE_OK, result); + }); + + connectivity.get(); + } + + /** + * Test connectivity command works via control.sh with one node failing. + */ + @Test + public void testConnectivityCommandWithFailedNodes() throws Exception { + UUID okId = UUID.randomUUID(); + UUID failingId = UUID.randomUUID(); + + UnaryOperator<IgniteConfiguration> operator = configuration -> { + configuration.setCommunicationSpi(new TcpCommunicationSpi() { + /** {inheritDoc} */ + @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { + BitSet bitSet = new BitSet(); + + int idx = 0; + + for (ClusterNode remoteNode : nodes) { + if (!remoteNode.id().equals(failingId)) + bitSet.set(idx); + + idx++; + } + + return new IgniteFinishedFutureImpl<>(bitSet); + } + }); + return configuration; + }; + + IgniteEx ignite = startGrid("normal", configuration -> { + operator.apply(configuration); + configuration.setConsistentId(okId); + configuration.setNodeId(okId); + return configuration; + }); + + IgniteEx failure = startGrid("failure", configuration -> { + operator.apply(configuration); + configuration.setConsistentId(failingId); + configuration.setNodeId(failingId); + return configuration; + }); + + ignite.cluster().state(ACTIVE); + + failure.cluster().state(ACTIVE); + + injectTestSystemOut(); + + int connectivity = execute("--diagnostic", "connectivity"); + assertEquals(EXIT_CODE_OK, connectivity); + + String out = testOut.toString(); + String what = "There is no connectivity between the following nodes"; + + assertContains(log, out.replaceAll("[\\W_]+", "").trim(), + what.replaceAll("[\\W_]+", "").trim()); + } + + /** * Test baseline remove works via control.sh * * @throws Exception If failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java new file mode 100644 index 0000000..43df8c2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.diagnostic.availability; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Connectivity task arguments + */ +public class VisorConnectivityArgs extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @Nullable private Set<UUID> nodeIds; + + /** + * Default constructor. + */ + public VisorConnectivityArgs() { + } + + /** + * @param nodeIds Node ids. + */ + public VisorConnectivityArgs(@Nullable Set<UUID> nodeIds) { + this.nodeIds = nodeIds; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeCollection(out, nodeIds); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + nodeIds = U.readSet(in); + } + + /** + * Get list of node ids to check connectivity to + */ + public @Nullable Set<UUID> getNodeIds() { + return nodeIds; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java new file mode 100644 index 0000000..7f83d90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.diagnostic.availability; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Connectivity task result + */ +public class VisorConnectivityResult extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @Nullable private Map<ClusterNode, Boolean> nodeStatuses; + + /** + * Default constructor. + */ + public VisorConnectivityResult() { + } + + /** + * @param nodeStatuses Node statuses. + */ + public VisorConnectivityResult(@Nullable Map<ClusterNode, Boolean> nodeStatuses) { + this.nodeStatuses = nodeStatuses; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeMap(out, nodeStatuses); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + nodeStatuses = U.readMap(in); + } + + /** + * Get connectivity statuses for a node + */ + public @Nullable Map<ClusterNode, Boolean> getNodeIds() { + return nodeStatuses; + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java new file mode 100644 index 0000000..1b30a34 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.diagnostic.availability; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; + +/** + * Visor task that checks connectivity status between nodes. + */ +@GridInternal +public class VisorConnectivityTask + extends VisorMultiNodeTask<VisorConnectivityArgs, Map<ClusterNode, VisorConnectivityResult>, VisorConnectivityResult> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob<VisorConnectivityArgs, VisorConnectivityResult> job(VisorConnectivityArgs arg) { + return new VisorConnectivityJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected Map<ClusterNode, VisorConnectivityResult> reduce0( + List<ComputeJobResult> results) throws IgniteException { + Map<ClusterNode, VisorConnectivityResult> map = new HashMap<>(); + + results.forEach(result -> { + if (result.getException() != null) + return; + + final ClusterNode node = result.getNode(); + final VisorConnectivityResult data = result.getData(); + map.put(node, data); + }); + + return map; + } + + /** {@inheritDoc} */ + @Override protected Collection<UUID> jobNodes(VisorTaskArgument<VisorConnectivityArgs> arg) { + return arg.getArgument().getNodeIds(); + } + + /** + * This job is sent to every node in cluster. It then use compute on every other node just to check + * that there is a connection between nodes. + */ + private static class VisorConnectivityJob extends VisorJob<VisorConnectivityArgs, VisorConnectivityResult> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param arg Formal job argument. + * @param debug Debug flag. + */ + private VisorConnectivityJob(VisorConnectivityArgs arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected VisorConnectivityResult run(VisorConnectivityArgs arg) { + List<UUID> ids = arg.getNodeIds().stream() + .filter(uuid -> !Objects.equals(ignite.configuration().getNodeId().toString(), uuid.toString())) + .collect(Collectors.toList()); + + List<ClusterNode> nodes = new ArrayList<>(ignite.cluster().forNodeIds(ids).nodes()); + + CommunicationSpi spi = ignite.configuration().getCommunicationSpi(); + + Map<ClusterNode, Boolean> statuses = new HashMap<>(); + + if (spi instanceof TcpCommunicationSpi) { + BitSet set = ((TcpCommunicationSpi) spi).checkConnection(nodes).get(); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + boolean success = set.get(i); + + statuses.put(node, success); + } + } + + return new VisorConnectivityResult(statuses); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorConnectivityJob.class, this); + } + } + +} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 1548b78..2396d1d 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2124,6 +2124,11 @@ org.apache.ignite.internal.visor.debug.VisorThreadDumpTaskResult org.apache.ignite.internal.visor.debug.VisorThreadInfo org.apache.ignite.internal.visor.debug.VisorThreadLockInfo org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo +org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityResult +org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask +org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask$VisorConnectivityJob +org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask$CheckNodesCallable +org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityArgs org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask$VisorDefragmentationJob org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation