This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 68effd6  IGNITE-13766 API for network connectivity check - Fixes #8500.
68effd6 is described below

commit 68effd6f4eaed9945a3781b1bb6f297e04b9bd84
Author: Semyon Danilov <samvi...@yandex.ru>
AuthorDate: Tue Dec 22 15:49:25 2020 +0300

    IGNITE-13766 API for network connectivity check - Fixes #8500.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../diagnostic/ConnectivityCommand.java            | 194 +++++++++++++++++++++
 .../commandline/diagnostic/DiagnosticCommand.java  |   3 +
 .../diagnostic/DiagnosticSubCommand.java           |   5 +-
 .../apache/ignite/util/GridCommandHandlerTest.java | 146 ++++++++++++++++
 .../availability/VisorConnectivityArgs.java        |  68 ++++++++
 .../availability/VisorConnectivityResult.java      |  69 ++++++++
 .../availability/VisorConnectivityTask.java        | 125 +++++++++++++
 .../main/resources/META-INF/classnames.properties  |   5 +
 8 files changed, 614 insertions(+), 1 deletion(-)

diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java
new file mode 100644
index 0000000..99ca0cc
--- /dev/null
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/ConnectivityCommand.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.commandline.diagnostic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientNode;
+import org.apache.ignite.internal.commandline.Command;
+import org.apache.ignite.internal.commandline.TaskExecutor;
+import 
org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityArgs;
+import 
org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityResult;
+import 
org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask;
+
+import static 
org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME;
+import static org.apache.ignite.internal.commandline.CommandList.DIAGNOSTIC;
+import static org.apache.ignite.internal.commandline.CommandLogger.join;
+import static 
org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.CONNECTIVITY;
+
+/**
+ * Command to check connectivity between every node.
+ */
+public class ConnectivityCommand implements Command<Void> {
+    /**
+     * Header of output table.
+     */
+    private final List<String> TABLE_HEADER = Arrays.asList(
+            "SOURCE-NODE-ID",
+            "SOURCE-CONSISTENT-ID",
+            "SOURCE-NODE-TYPE",
+            "DESTINATION-NODE-ID",
+            "DESTINATION_CONSISTENT_ID",
+            "DESTINATION-NODE-TYPE"
+    );
+
+    /**
+     * Client node type string.
+     */
+    private final String NODE_TYPE_CLIENT = "CLIENT";
+
+    /**
+     * Server node type string.
+     */
+    private final String NODE_TYPE_SERVER = "SERVER";
+
+    /**
+     * Logger
+     */
+    private Logger logger;
+
+    /** {@inheritDoc} */
+    @Override public Object execute(GridClientConfiguration clientCfg, Logger 
logger) throws Exception {
+        this.logger = logger;
+
+        Map<ClusterNode, VisorConnectivityResult> result;
+
+        try (GridClient client = Command.startClient(clientCfg)) {
+            Set<UUID> nodeIds = 
client.compute().nodes().stream().map(GridClientNode::nodeId).collect(Collectors.toSet());
+
+            VisorConnectivityArgs taskArg = new VisorConnectivityArgs(nodeIds);
+
+            result = TaskExecutor.executeTask(
+                client,
+                VisorConnectivityTask.class,
+                taskArg,
+                clientCfg
+            );
+        }
+
+        printResult(result);
+
+        return result;
+    }
+
+    /**
+     * @param res Result.
+     */
+    private void printResult(Map<ClusterNode, VisorConnectivityResult> res) {
+        final boolean[] hasFailed = {false};
+
+        final List<List<String>> table = new ArrayList<>();
+
+        table.add(TABLE_HEADER);
+
+        for (Map.Entry<ClusterNode, VisorConnectivityResult> entry : 
res.entrySet()) {
+            ClusterNode key = entry.getKey();
+
+            String id = key.id().toString();
+            String consId = key.consistentId().toString();
+            String isClient = key.isClient() ? NODE_TYPE_CLIENT : 
NODE_TYPE_SERVER;
+
+            VisorConnectivityResult value = entry.getValue();
+
+            Map<ClusterNode, Boolean> statuses = value.getNodeIds();
+
+            List<List<String>> row = statuses.entrySet().stream().map(nodeStat 
-> {
+                ClusterNode remoteNode = nodeStat.getKey();
+
+                String remoteId = remoteNode.id().toString();
+                String remoteConsId = remoteNode.consistentId().toString();
+                String nodeType = remoteNode.isClient() ? NODE_TYPE_CLIENT : 
NODE_TYPE_SERVER;
+
+                Boolean status = nodeStat.getValue();
+
+                if (!status) {
+                    hasFailed[0] = true;
+                    return Arrays.asList(id, consId, isClient, remoteId, 
remoteConsId, nodeType);
+                }
+
+                return null;
+            })
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
+
+            table.addAll(row);
+        }
+
+        if (hasFailed[0])
+            logger.info("There is no connectivity between the following 
nodes:\n" + formatAsTable(table));
+        else
+            logger.info("There are no connectivity problems.");
+    }
+
+    /**
+     * Format output as a table
+     * @param rows table rows.
+     * @return formatted string.
+     */
+    public static String formatAsTable(List<List<String>> rows) {
+        int[] maxLengths = new int[rows.get(0).size()];
+
+        for (List<String> row : rows) {
+            for (int i = 0; i < row.size(); i++)
+                maxLengths[i] = Math.max(maxLengths[i], row.get(i).length());
+        }
+
+        StringBuilder formatBuilder = new StringBuilder();
+
+        for (int maxLength : maxLengths)
+            formatBuilder.append("%-").append(maxLength + 2).append("s");
+
+        String format = formatBuilder.toString();
+
+        StringBuilder result = new StringBuilder();
+
+        for (List<String> row : rows)
+            result.append(String.format(format, row.toArray(new 
String[0]))).append("\n");
+
+        return result.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void arg() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printUsage(Logger logger) {
+        logger.info("View connectvity state of all nodes in cluster");
+        logger.info(join(" ",
+            UTILITY_NAME, DIAGNOSTIC, CONNECTIVITY,
+            "// Prints info about connectivity between nodes"));
+        logger.info("");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return CONNECTIVITY.name();
+    }
+}
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java
index c0e59a3..60534c4 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticCommand.java
@@ -29,6 +29,7 @@ import static 
org.apache.ignite.internal.commandline.CommandHandler.UTILITY_NAME
 import static org.apache.ignite.internal.commandline.CommandList.DIAGNOSTIC;
 import static org.apache.ignite.internal.commandline.CommandLogger.INDENT;
 import static org.apache.ignite.internal.commandline.CommandLogger.join;
+import static 
org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.CONNECTIVITY;
 import static 
org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.HELP;
 import static 
org.apache.ignite.internal.commandline.diagnostic.DiagnosticSubCommand.PAGE_LOCKS;
 
@@ -77,6 +78,7 @@ public class DiagnosticCommand extends 
AbstractCommand<DiagnosticSubCommand> {
 
         switch (cmd) {
             case HELP:
+            case CONNECTIVITY:
                 break;
 
             case PAGE_LOCKS:
@@ -109,6 +111,7 @@ public class DiagnosticCommand extends 
AbstractCommand<DiagnosticSubCommand> {
      */
     private void printDiagnosticHelp(Logger logger) {
         logger.info(INDENT + join(" ", UTILITY_NAME, DIAGNOSTIC, PAGE_LOCKS + 
" - dump page locks info."));
+        logger.info(INDENT + join(" ", UTILITY_NAME, DIAGNOSTIC, CONNECTIVITY 
+ " - show connectivity state."));
 
         logger.info(INDENT + "Subcommands:");
 
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java
index 2229fde..aac7ddb 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/diagnostic/DiagnosticSubCommand.java
@@ -27,7 +27,10 @@ public enum DiagnosticSubCommand {
     HELP("help", null),
 
     /** */
-    PAGE_LOCKS("pageLocks", new PageLocksCommand());
+    PAGE_LOCKS("pageLocks", new PageLocksCommand()),
+
+    /** */
+    CONNECTIVITY("connectivity", new ConnectivityCommand());
 
     /** Diagnostic command name. */
     private final String name;
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index ec1508d..4a2bfe1 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -101,6 +102,7 @@ import 
org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpConfigura
 import 
org.apache.ignite.internal.processors.cache.warmup.BlockedWarmUpStrategy;
 import 
org.apache.ignite.internal.processors.cache.warmup.WarmUpTestPluginProvider;
 import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.G;
@@ -111,10 +113,12 @@ import 
org.apache.ignite.internal.visor.cache.VisorFindAndDeleteGarbageInPersist
 import org.apache.ignite.internal.visor.tx.VisorTxInfo;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -1164,6 +1168,148 @@ public class GridCommandHandlerTest extends 
GridCommandHandlerClusterPerMethodAb
     }
 
     /**
+     * Test connectivity command works via control.sh.
+     */
+    @Test
+    public void testConnectivityCommandWithoutFailedNodes() throws Exception {
+        IgniteEx ignite = startGrids(5);
+
+        assertFalse(ignite.cluster().state().active());
+
+        ignite.cluster().state(ACTIVE);
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--diagnostic", "connectivity"));
+
+        assertContains(log, testOut.toString(), "There are no connectivity 
problems.");
+    }
+
+    /**
+     * Test that if node exits topology during connectivity check, the command 
will not fail.
+     *
+     * Description:
+     * 1. Start three nodes.
+     * 2. Execute connectivity check.
+     * 3. When 3-rd node receives connectivity check compute task, it must 
stop itself.
+     * 4. The command should exit with code OK.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testConnectivityCommandWithNodeExit() throws Exception {
+        IgniteEx[] node3 = new IgniteEx[1];
+
+        class KillNode3CommunicationSpi extends TcpCommunicationSpi {
+            /** Fail check connection request and stop third node */
+            boolean fail;
+
+            public KillNode3CommunicationSpi(boolean fail) {
+                this.fail = fail;
+            }
+
+            /** {@inheritDoc} */
+            @Override public IgniteFuture<BitSet> 
checkConnection(List<ClusterNode> nodes) {
+                if (fail) {
+                    runAsync(node3[0]::close);
+                    return null;
+                }
+
+                return super.checkConnection(nodes);
+            }
+        }
+
+        IgniteEx node1 = startGrid(1, (UnaryOperator<IgniteConfiguration>) 
configuration -> {
+            configuration.setCommunicationSpi(new 
KillNode3CommunicationSpi(false));
+            return configuration;
+        });
+
+        IgniteEx node2 = startGrid(2, (UnaryOperator<IgniteConfiguration>) 
configuration -> {
+            configuration.setCommunicationSpi(new 
KillNode3CommunicationSpi(false));
+            return configuration;
+        });
+
+        node3[0] = startGrid(3, (UnaryOperator<IgniteConfiguration>) 
configuration -> {
+            configuration.setCommunicationSpi(new 
KillNode3CommunicationSpi(true));
+            return configuration;
+        });
+
+        assertFalse(node1.cluster().state().active());
+
+        node1.cluster().state(ACTIVE);
+
+        assertEquals(3, node1.cluster().nodes().size());
+
+        injectTestSystemOut();
+
+        final IgniteInternalFuture<?> connectivity = runAsync(() -> {
+            final int result = execute("--diagnostic", "connectivity");
+            assertEquals(EXIT_CODE_OK, result);
+        });
+
+        connectivity.get();
+    }
+
+    /**
+     * Test connectivity command works via control.sh with one node failing.
+     */
+    @Test
+    public void testConnectivityCommandWithFailedNodes() throws Exception {
+        UUID okId = UUID.randomUUID();
+        UUID failingId = UUID.randomUUID();
+
+        UnaryOperator<IgniteConfiguration> operator = configuration -> {
+            configuration.setCommunicationSpi(new TcpCommunicationSpi() {
+                /** {inheritDoc} */
+                @Override public IgniteFuture<BitSet> 
checkConnection(List<ClusterNode> nodes) {
+                    BitSet bitSet = new BitSet();
+
+                    int idx = 0;
+
+                    for (ClusterNode remoteNode : nodes) {
+                        if (!remoteNode.id().equals(failingId))
+                            bitSet.set(idx);
+
+                        idx++;
+                    }
+
+                    return new IgniteFinishedFutureImpl<>(bitSet);
+                }
+            });
+            return configuration;
+        };
+
+        IgniteEx ignite = startGrid("normal", configuration -> {
+            operator.apply(configuration);
+            configuration.setConsistentId(okId);
+            configuration.setNodeId(okId);
+            return configuration;
+        });
+
+        IgniteEx failure = startGrid("failure", configuration -> {
+            operator.apply(configuration);
+            configuration.setConsistentId(failingId);
+            configuration.setNodeId(failingId);
+            return configuration;
+        });
+
+        ignite.cluster().state(ACTIVE);
+
+        failure.cluster().state(ACTIVE);
+
+        injectTestSystemOut();
+
+        int connectivity = execute("--diagnostic", "connectivity");
+        assertEquals(EXIT_CODE_OK, connectivity);
+
+        String out = testOut.toString();
+        String what = "There is no connectivity between the following nodes";
+
+        assertContains(log, out.replaceAll("[\\W_]+", "").trim(),
+                            what.replaceAll("[\\W_]+", "").trim());
+    }
+
+    /**
      * Test baseline remove works via control.sh
      *
      * @throws Exception If failed.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java
new file mode 100644
index 0000000..43df8c2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityArgs.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.diagnostic.availability;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Connectivity task arguments
+ */
+public class VisorConnectivityArgs extends IgniteDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @Nullable private Set<UUID> nodeIds;
+
+    /**
+     * Default constructor.
+     */
+    public VisorConnectivityArgs() {
+    }
+
+    /**
+     * @param nodeIds Node ids.
+     */
+    public VisorConnectivityArgs(@Nullable Set<UUID> nodeIds) {
+        this.nodeIds = nodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        U.writeCollection(out, nodeIds);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        nodeIds = U.readSet(in);
+    }
+
+    /**
+     * Get list of node ids to check connectivity to
+     */
+    public @Nullable Set<UUID> getNodeIds() {
+        return nodeIds;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java
new file mode 100644
index 0000000..7f83d90
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityResult.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.diagnostic.availability;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Connectivity task result
+ */
+public class VisorConnectivityResult extends IgniteDataTransferObject {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @Nullable private Map<ClusterNode, Boolean> nodeStatuses;
+
+    /**
+     * Default constructor.
+     */
+    public VisorConnectivityResult() {
+    }
+
+    /**
+     * @param nodeStatuses Node statuses.
+     */
+    public VisorConnectivityResult(@Nullable Map<ClusterNode, Boolean> 
nodeStatuses) {
+        this.nodeStatuses = nodeStatuses;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        U.writeMap(out, nodeStatuses);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
+        nodeStatuses = U.readMap(in);
+    }
+
+    /**
+     * Get connectivity statuses for a node
+     */
+    public @Nullable Map<ClusterNode, Boolean> getNodeIds() {
+        return nodeStatuses;
+    }
+
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java
new file mode 100644
index 0000000..1b30a34
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/diagnostic/availability/VisorConnectivityTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.diagnostic.availability;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Visor task that checks connectivity status between nodes.
+ */
+@GridInternal
+public class VisorConnectivityTask
+    extends VisorMultiNodeTask<VisorConnectivityArgs, Map<ClusterNode, 
VisorConnectivityResult>, VisorConnectivityResult> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorJob<VisorConnectivityArgs, 
VisorConnectivityResult> job(VisorConnectivityArgs arg) {
+        return new VisorConnectivityJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<ClusterNode, VisorConnectivityResult> 
reduce0(
+        List<ComputeJobResult> results) throws IgniteException {
+        Map<ClusterNode, VisorConnectivityResult> map = new HashMap<>();
+
+        results.forEach(result -> {
+            if (result.getException() != null)
+                return;
+
+            final ClusterNode node = result.getNode();
+            final VisorConnectivityResult data = result.getData();
+            map.put(node, data);
+        });
+
+        return map;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<UUID> 
jobNodes(VisorTaskArgument<VisorConnectivityArgs> arg) {
+        return arg.getArgument().getNodeIds();
+    }
+
+    /**
+     * This job is sent to every node in cluster. It then use compute on every 
other node just to check
+     * that there is a connection between nodes.
+     */
+    private static class VisorConnectivityJob extends 
VisorJob<VisorConnectivityArgs, VisorConnectivityResult> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param arg   Formal job argument.
+         * @param debug Debug flag.
+         */
+        private VisorConnectivityJob(VisorConnectivityArgs arg, boolean debug) 
{
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorConnectivityResult run(VisorConnectivityArgs 
arg) {
+            List<UUID> ids = arg.getNodeIds().stream()
+                .filter(uuid -> 
!Objects.equals(ignite.configuration().getNodeId().toString(), uuid.toString()))
+                .collect(Collectors.toList());
+
+            List<ClusterNode> nodes = new 
ArrayList<>(ignite.cluster().forNodeIds(ids).nodes());
+
+            CommunicationSpi spi = 
ignite.configuration().getCommunicationSpi();
+
+            Map<ClusterNode, Boolean> statuses = new HashMap<>();
+
+            if (spi instanceof TcpCommunicationSpi) {
+                BitSet set = ((TcpCommunicationSpi) 
spi).checkConnection(nodes).get();
+
+                for (int i = 0; i < nodes.size(); i++) {
+                    ClusterNode node = nodes.get(i);
+                    boolean success = set.get(i);
+
+                    statuses.put(node, success);
+                }
+            }
+
+            return new VisorConnectivityResult(statuses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorConnectivityJob.class, this);
+        }
+    }
+
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 1548b78..2396d1d 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2124,6 +2124,11 @@ 
org.apache.ignite.internal.visor.debug.VisorThreadDumpTaskResult
 org.apache.ignite.internal.visor.debug.VisorThreadInfo
 org.apache.ignite.internal.visor.debug.VisorThreadLockInfo
 org.apache.ignite.internal.visor.debug.VisorThreadMonitorInfo
+org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityResult
+org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask
+org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask$VisorConnectivityJob
+org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityTask$CheckNodesCallable
+org.apache.ignite.internal.visor.diagnostic.availability.VisorConnectivityArgs
 org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask
 
org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationTask$VisorDefragmentationJob
 org.apache.ignite.internal.visor.defragmentation.VisorDefragmentationOperation

Reply via email to