IGNITE-7222 Added ZooKeeper discovery SPI

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a64b941d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a64b941d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a64b941d

Branch: refs/heads/master
Commit: a64b941df0a4d3bfc3a2dab32f85c371c1a509be
Parents: a0a187b
Author: Semyon Boikov <sboi...@apache.org>
Authored: Tue Apr 10 11:37:39 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Apr 10 11:57:40 2018 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcAbstractDmlStatementSelfTest.java |    6 +-
 .../CommunicationFailureContext.java            |   62 +
 .../CommunicationFailureResolver.java           |   28 +
 .../DefaultCommunicationFailureResolver.java    |  305 ++
 .../configuration/IgniteConfiguration.java      |   22 +
 .../org/apache/ignite/internal/GridTopic.java   |    3 +
 .../apache/ignite/internal/IgniteKernal.java    |    5 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |    3 +
 .../internal/managers/GridManagerAdapter.java   |    8 +
 .../managers/communication/GridIoManager.java   |    4 +-
 .../communication/GridIoMessageFactory.java     |   12 +
 .../discovery/CustomMessageWrapper.java         |    5 +
 .../internal/managers/discovery/DiscoCache.java |    8 +
 .../discovery/DiscoveryCustomMessage.java       |   10 +-
 .../DiscoveryMessageResultsCollector.java       |  222 +
 .../discovery/GridDiscoveryManager.java         |  128 +-
 .../managers/discovery/IgniteClusterNode.java   |   69 +
 .../managers/discovery/IgniteDiscoverySpi.java  |   67 +
 .../IgniteDiscoverySpiInternalListener.java     |   42 +
 .../authentication/UserAcceptedMessage.java     |    5 +
 .../authentication/UserProposedMessage.java     |    5 +
 .../cache/CacheAffinityChangeMessage.java       |    5 +
 .../cache/CacheAffinitySharedManager.java       |   26 +-
 .../cache/CacheStatisticsModeChangeMessage.java |    5 +
 .../ClientCacheChangeDiscoveryMessage.java      |    5 +
 .../ClientCacheChangeDummyDiscoveryMessage.java |    5 +
 .../cache/DynamicCacheChangeBatch.java          |    5 +
 .../processors/cache/GridCacheAdapter.java      |    3 +-
 .../GridCachePartitionExchangeManager.java      |   16 +-
 .../processors/cache/GridCacheProcessor.java    |    4 +-
 .../processors/cache/GridCacheUtils.java        |    6 +-
 .../processors/cache/WalStateFinishMessage.java |    5 +
 .../cache/WalStateProposeMessage.java           |    5 +
 .../cache/binary/BinaryMetadataTransport.java   |   24 +-
 .../binary/MetadataUpdateAcceptedMessage.java   |    5 +
 .../binary/MetadataUpdateProposedMessage.java   |    5 +
 .../dht/GridClientPartitionTopology.java        |   39 +
 .../distributed/dht/GridDhtCacheAdapter.java    |    4 +
 .../dht/GridDhtPartitionTopology.java           |    6 +
 .../dht/GridDhtPartitionTopologyImpl.java       |   39 +
 .../GridDhtPartitionsExchangeFuture.java        |   26 +-
 .../cluster/ChangeGlobalStateFinishMessage.java |    5 +
 .../cluster/ChangeGlobalStateMessage.java       |    5 +
 .../cluster/ClusterMetricsUpdateMessage.java    |  158 +
 .../processors/cluster/ClusterNodeMetrics.java  |   62 +
 .../processors/cluster/ClusterProcessor.java    |  249 +-
 .../continuous/AbstractContinuousMessage.java   |    5 +
 .../continuous/ContinuousRoutineInfo.java       |  100 +
 .../ContinuousRoutineStartResultMessage.java    |  206 +
 .../ContinuousRoutinesCommonDiscoveryData.java  |   45 +
 .../continuous/ContinuousRoutinesInfo.java      |  132 +
 ...tinuousRoutinesJoiningNodeDiscoveryData.java |   45 +
 .../continuous/GridContinuousProcessor.java     |  862 +++-
 .../continuous/StartRequestDataV2.java          |  164 +
 .../StartRoutineDiscoveryMessageV2.java         |   77 +
 .../StopRoutineAckDiscoveryMessage.java         |    5 +
 .../datastreamer/DataStreamerImpl.java          |   27 +-
 .../marshaller/MappingAcceptedMessage.java      |    5 +
 .../marshaller/MappingProposedMessage.java      |    5 +
 .../message/SchemaFinishDiscoveryMessage.java   |    5 +
 .../message/SchemaProposeDiscoveryMessage.java  |    5 +
 .../ignite/internal/util/nio/GridNioServer.java |   18 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   10 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   11 +
 .../communication/tcp/TcpCommunicationSpi.java  |  190 +-
 .../tcp/internal/ConnectionKey.java             |  117 +
 .../TcpCommunicationConnectionCheckFuture.java  |  519 ++
 ...pCommunicationNodeConnectionCheckFuture.java |   30 +
 .../discovery/DiscoverySpiCustomMessage.java    |   15 +-
 ...DiscoverySpiMutableCustomMessageSupport.java |   40 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   55 +-
 .../tcp/internal/TcpDiscoveryNode.java          |   32 +-
 .../resources/META-INF/classnames.properties    |    2 +
 ...unctionExcludeNeighborsAbstractSelfTest.java |    8 +-
 .../failure/FailureHandlerTriggeredTest.java    |    4 +
 .../internal/ClusterGroupHostsSelfTest.java     |    3 +
 .../ignite/internal/ClusterGroupSelfTest.java   |    2 +
 .../internal/ClusterNodeMetricsUpdateTest.java  |  173 +
 .../internal/DiscoverySpiTestListener.java      |  162 +
 .../ignite/internal/GridDiscoverySelfTest.java  |   14 +-
 .../GridJobMasterLeaveAwareSelfTest.java        |    2 +
 .../internal/GridJobStealingSelfTest.java       |    2 +
 .../internal/GridSameVmStartupSelfTest.java     |   19 +-
 .../apache/ignite/internal/GridSelfTest.java    |    2 +
 .../IgniteClientReconnectAbstractTest.java      |   53 +-
 .../IgniteClientReconnectApiExceptionTest.java  |   21 +-
 .../IgniteClientReconnectAtomicsTest.java       |   30 +-
 .../IgniteClientReconnectCacheTest.java         |   49 +-
 .../IgniteClientReconnectCollectionsTest.java   |   14 +-
 .../IgniteClientReconnectComputeTest.java       |    6 +-
 ...eClientReconnectContinuousProcessorTest.java |   13 +-
 ...IgniteClientReconnectDiscoveryStateTest.java |   22 +-
 ...niteClientReconnectFailoverAbstractTest.java |   12 +-
 .../IgniteClientReconnectServicesTest.java      |    8 +-
 .../internal/IgniteClientReconnectStopTest.java |   12 +-
 .../IgniteClientReconnectStreamerTest.java      |    4 +-
 .../ignite/internal/IgniteClientRejoinTest.java |    3 +
 .../GridDiscoveryManagerAliveCacheSelfTest.java |   16 +-
 .../GridAffinityProcessorAbstractSelfTest.java  |    4 +-
 .../CacheMetricsForClusterGroupSelfTest.java    |   12 +-
 .../cache/GridCacheAbstractSelfTest.java        |    2 +
 .../cache/IgniteCacheNearLockValueSelfTest.java |    4 +-
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |   11 +
 .../IgniteClusterActivateDeactivateTest.java    |   65 +
 .../IgniteDaemonNodeMarshallerCacheTest.java    |    3 +-
 .../binary/BinaryMetadataUpdatesFlowTest.java   |   12 +-
 ...ntNodeBinaryObjectMetadataMultinodeTest.java |    2 +-
 .../GridCacheQueueClientDisconnectTest.java     |   10 +
 .../IgniteClientDataStructuresAbstractTest.java |    3 +-
 .../CacheLateAffinityAssignmentTest.java        |  127 +-
 .../GridCacheNodeFailureAbstractTest.java       |    5 +-
 .../distributed/IgniteCache150ClientsTest.java  |    2 +
 .../distributed/IgniteCacheManyClientsTest.java |   44 +-
 .../IgniteOptimisticTxSuspendResumeTest.java    |    2 +
 ...ridCacheDhtPreloadMultiThreadedSelfTest.java |    4 +
 .../dht/GridCacheDhtPreloadSelfTest.java        |    2 +
 .../dht/TxRecoveryStoreEnabledTest.java         |   15 +-
 ...titionedExplicitLockNodeFailureSelfTest.java |    3 +-
 .../ClientReconnectContinuousQueryTest.java     |   19 +-
 ...yRemoteFilterMissingInClassPathSelfTest.java |   23 +-
 ...CacheContinuousQueryClientReconnectTest.java |    3 +
 .../CacheVersionedEntryAbstractTest.java        |   33 +-
 .../continuous/GridEventConsumeSelfTest.java    |   34 +-
 .../service/ClosureServiceClientsNodesTest.java |   19 +-
 .../internal/util/GridTestClockTimer.java       |    9 +
 .../GridMarshallerMappingConsistencyTest.java   |    4 +
 .../ignite/messaging/GridMessagingSelfTest.java |  126 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   71 +
 .../FilterDataForClientNodeDiscoveryTest.java   |    5 +
 .../testframework/GridSpiTestContext.java       |   10 +
 .../config/GridTestProperties.java              |    9 +
 .../testframework/junits/GridAbstractTest.java  |  129 +-
 .../junits/multijvm/IgniteNodeRunner.java       |    2 +
 .../testsuites/IgniteComputeGridTestSuite.java  |    2 +
 ...niteCacheDistributedQueryCancelSelfTest.java |    2 +-
 .../DynamicIndexAbstractBasicSelfTest.java      |    5 +-
 .../GridJtaTransactionManagerSelfTest.java      |   21 +-
 .../GridPartitionedCacheJtaFactorySelfTest.java |   19 +-
 .../org/apache/ignite/spark/IgniteRDD.scala     |    9 +-
 .../ignite/internal/GridFactorySelfTest.java    |    3 +-
 .../p2p/GridP2PUserVersionChangeSelfTest.java   |    5 +-
 modules/yardstick/pom-standalone.xml            |    6 +
 modules/yardstick/pom.xml                       |    6 +
 modules/zookeeper/pom.xml                       |   40 +
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |  557 ++
 .../zk/internal/ZkAbstractCallabck.java         |   83 +
 .../zk/internal/ZkAbstractChildrenCallback.java |   61 +
 .../zk/internal/ZkAbstractWatcher.java          |   55 +
 .../discovery/zk/internal/ZkAliveNodeData.java  |   40 +
 .../zk/internal/ZkBulkJoinContext.java          |   50 +
 .../discovery/zk/internal/ZkClusterNodes.java   |  103 +
 .../internal/ZkCommunicationErrorNodeState.java |   46 +
 .../ZkCommunicationErrorProcessFuture.java      |  411 ++
 ...kCommunicationErrorResolveFinishMessage.java |   69 +
 .../ZkCommunicationErrorResolveResult.java      |   45 +
 ...ZkCommunicationErrorResolveStartMessage.java |   61 +
 .../internal/ZkCommunicationFailureContext.java |  188 +
 .../zk/internal/ZkDiscoveryCustomEventData.java |   89 +
 .../zk/internal/ZkDiscoveryEventData.java       |  165 +
 .../zk/internal/ZkDiscoveryEventsData.java      |  121 +
 .../internal/ZkDiscoveryNodeFailEventData.java  |   55 +
 .../internal/ZkDiscoveryNodeJoinEventData.java  |   60 +
 .../ZkDistributedCollectDataFuture.java         |  250 +
 .../zk/internal/ZkForceNodeFailMessage.java     |   65 +
 .../discovery/zk/internal/ZkIgnitePaths.java    |  307 ++
 .../zk/internal/ZkInternalJoinErrorMessage.java |   44 +
 .../zk/internal/ZkInternalMessage.java          |   27 +
 .../zk/internal/ZkJoinEventDataForJoined.java   |   83 +
 .../zk/internal/ZkJoinedNodeEvtData.java        |   79 +
 .../zk/internal/ZkJoiningNodeData.java          |   87 +
 .../zk/internal/ZkNoServersMessage.java         |   50 +
 .../zk/internal/ZkNodeValidateResult.java       |   43 +
 .../spi/discovery/zk/internal/ZkRunnable.java   |   51 +
 .../discovery/zk/internal/ZkRuntimeState.java   |  135 +
 .../discovery/zk/internal/ZkTimeoutObject.java  |   54 +
 .../discovery/zk/internal/ZookeeperClient.java  | 1219 +++++
 .../ZookeeperClientFailedException.java         |   40 +
 .../zk/internal/ZookeeperClusterNode.java       |  362 ++
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 4464 ++++++++++++++++
 .../java/org/apache/ZookeeperNodeStart.java     |   46 +
 ...CacheEntryListenerWithZkDiscoAtomicTest.java |   32 +
 .../ZookeeperDiscoverySpiAbstractTestSuite.java |  118 +
 .../zk/ZookeeperDiscoverySpiTestSuite1.java     |   44 +
 .../zk/ZookeeperDiscoverySpiTestSuite2.java     |   94 +
 ...ZookeeperDiscoverySuitePreprocessorTest.java |  101 +
 .../zk/internal/ZookeeperClientTest.java        |  495 ++
 ...okeeperDiscoverySpiSaslAuthAbstractTest.java |  247 +
 ...ZookeeperDiscoverySpiSaslFailedAuthTest.java |   44 +
 ...eeperDiscoverySpiSaslSuccessfulAuthTest.java |   48 +
 .../zk/internal/ZookeeperDiscoverySpiTest.java  | 4847 ++++++++++++++++++
 .../zookeeper/ZkTestClientCnxnSocketNIO.java    |  137 +
 191 files changed, 21158 insertions(+), 777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
index f4c0ca3..0a055a9 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
@@ -138,8 +138,10 @@ public abstract class JdbcAbstractDmlStatementSelfTest 
extends GridCommonAbstrac
     @Override protected void afterTest() throws Exception {
         
((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, 
true, true, false);
 
-        conn.close();
-        assertTrue(conn.isClosed());
+        if (conn != null) {
+            conn.close();
+            assertTrue(conn.isClosed());
+        }
 
         cleanUpWorkingDir();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
new file mode 100644
index 0000000..a32d38c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+
+/**
+ * Communication Failure Context.
+ */
+public interface CommunicationFailureContext {
+    /**
+     * @return Current topology snapshot.
+     */
+    public List<ClusterNode> topologySnapshot();
+
+    /**
+     * @param node1 First node.
+     * @param node2 Second node.
+     * @return {@code True} if {@link CommunicationSpi} is able to establish 
connection from first node to second node.
+     */
+    public boolean connectionAvailable(ClusterNode node1, ClusterNode node2);
+
+    /**
+     * @return Currently started caches.
+     */
+    public Map<String, CacheConfiguration<?, ?>> startedCaches();
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions affinity assignment.
+     */
+    public List<List<ClusterNode>> cacheAffinity(String cacheName);
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache partitions owners.
+     */
+    public List<List<ClusterNode>> cachePartitionOwners(String cacheName);
+
+    /**
+     * @param node Node to kill.
+     */
+    public void killNode(ClusterNode node);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java
new file mode 100644
index 0000000..a4d92f3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+/**
+ * Communication Failure Resolver.
+ */
+public interface CommunicationFailureResolver {
+    /**
+     * @param ctx Context.
+     */
+    public void resolve(CommunicationFailureContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
new file mode 100644
index 0000000..a4c6da9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Default Communication Failure Resolver.
+ */
+public class DefaultCommunicationFailureResolver implements 
CommunicationFailureResolver {
+    /** */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override public void resolve(CommunicationFailureContext ctx) {
+        ClusterGraph graph = new ClusterGraph(log, ctx);
+
+        ClusterSearch cluster = graph.findLargestIndependentCluster();
+
+        List<ClusterNode> nodes = ctx.topologySnapshot();
+
+        assert nodes.size() > 0;
+        assert cluster != null;
+
+        if (graph.checkFullyConnected(cluster.nodesBitSet)) {
+            assert cluster.nodeCnt <= nodes.size();
+
+            if (cluster.nodeCnt < nodes.size()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Communication problem resolver found fully 
connected independent cluster [" +
+                        "clusterSrvCnt=" + cluster.srvCnt +
+                        ", clusterTotalNodes=" + cluster.nodeCnt +
+                        ", totalAliveNodes=" + nodes.size() + "]");
+                }
+
+                for (int i = 0; i < nodes.size(); i++) {
+                    if (!cluster.nodesBitSet.get(i))
+                        ctx.killNode(nodes.get(i));
+                }
+            }
+            else
+                U.warn(log, "All alive nodes are fully connected, this should 
be resolved automatically.");
+        }
+        else {
+            if (log.isInfoEnabled()) {
+                log.info("Communication problem resolver failed to find fully 
connected independent cluster.");
+            }
+        }
+    }
+
+    /**
+     * @param cluster Cluster nodes mask.
+     * @param nodes Nodes.
+     * @param limit IDs limit.
+     * @return Cluster node IDs string.
+     */
+    private static String clusterNodeIds(BitSet cluster, List<ClusterNode> 
nodes, int limit) {
+        int startIdx = 0;
+
+        StringBuilder builder = new StringBuilder();
+
+        int cnt = 0;
+
+        for (;;) {
+            int idx = cluster.nextSetBit(startIdx);
+
+            if (idx == -1)
+                break;
+
+            startIdx = idx + 1;
+
+            if (builder.length() == 0) {
+                builder.append('[');
+            }
+            else
+                builder.append(", ");
+
+            builder.append(nodes.get(idx).id());
+
+            if (cnt++ > limit)
+                builder.append(", ...");
+        }
+
+        builder.append(']');
+
+        return builder.toString();
+    }
+
+    /**
+     *
+     */
+    private static class ClusterSearch {
+        /** */
+        int srvCnt;
+
+        /** */
+        int nodeCnt;
+
+        /** */
+        final BitSet nodesBitSet;
+
+        /**
+         * @param nodes Total nodes.
+         */
+        ClusterSearch(int nodes) {
+            nodesBitSet = new BitSet(nodes);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ClusterGraph {
+        /** */
+        private final static int WORD_IDX_SHIFT = 6;
+
+        /** */
+        private final IgniteLogger log;
+
+        /** */
+        private final int nodeCnt;
+
+        /** */
+        private final long[] visitBitSet;
+
+        /** */
+        private final CommunicationFailureContext ctx;
+
+        /** */
+        private final List<ClusterNode> nodes;
+
+        /**
+         * @param log Logger.
+         * @param ctx Context.
+         */
+        ClusterGraph(IgniteLogger log, CommunicationFailureContext ctx) {
+            this.log = log;
+            this.ctx = ctx;
+
+            nodes = ctx.topologySnapshot();
+
+            nodeCnt = nodes.size();
+
+            assert nodeCnt > 0;
+
+            visitBitSet = initBitSet(nodeCnt);
+        }
+
+        /**
+         * @param bitIndex Bit index.
+         * @return Word index containing bit with given index.
+         */
+        private static int wordIndex(int bitIndex) {
+            return bitIndex >> WORD_IDX_SHIFT;
+        }
+
+        /**
+         * @param bitCnt Number of bits.
+         * @return Bit set words.
+         */
+        static long[] initBitSet(int bitCnt) {
+            return new long[wordIndex(bitCnt - 1) + 1];
+        }
+
+        /**
+         * @return Cluster nodes bit set.
+         */
+        ClusterSearch findLargestIndependentCluster() {
+            ClusterSearch maxCluster = null;
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (getBit(visitBitSet, i))
+                    continue;
+
+                ClusterSearch cluster = new ClusterSearch(nodeCnt);
+
+                search(cluster, i);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Communication problem resolver found cluster 
[srvCnt=" + cluster.srvCnt +
+                        ", totalNodeCnt=" + cluster.nodeCnt +
+                        ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, 
nodes, 1000) + "]");
+                }
+
+                if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt)
+                    maxCluster = cluster;
+            }
+
+            return maxCluster;
+        }
+
+        /**
+         * @param cluster Cluster nodes bit set.
+         * @return {@code True} if all cluster nodes are able to connect to 
each other.
+         */
+        boolean checkFullyConnected(BitSet cluster) {
+            int startIdx = 0;
+
+            int clusterNodes = cluster.cardinality();
+
+            for (;;) {
+                int idx = cluster.nextSetBit(startIdx);
+
+                if (idx == -1)
+                    break;
+
+                ClusterNode node1 = nodes.get(idx);
+
+                for (int i = 0; i < clusterNodes; i++) {
+                    if (!cluster.get(i) || i == idx)
+                        continue;
+
+                    ClusterNode node2 = nodes.get(i);
+
+                    if (cluster.get(i) && !ctx.connectionAvailable(node1, 
node2))
+                        return false;
+                }
+
+                startIdx = idx + 1;
+            }
+
+            return true;
+        }
+
+        /**
+         * @param cluster Current cluster bit set.
+         * @param idx Node index.
+         */
+        void search(ClusterSearch cluster, int idx) {
+            assert !getBit(visitBitSet, idx);
+
+            setBit(visitBitSet, idx);
+
+            cluster.nodesBitSet.set(idx);
+            cluster.nodeCnt++;
+
+            ClusterNode node1 = nodes.get(idx);
+
+            if (!CU.clientNode(node1))
+                cluster.srvCnt++;
+
+            for (int i = 0; i < nodeCnt; i++) {
+                if (i == idx || getBit(visitBitSet, i))
+                    continue;
+
+                ClusterNode node2 = nodes.get(i);
+
+                boolean connected = ctx.connectionAvailable(node1, node2) ||
+                    ctx.connectionAvailable(node2, node1);
+
+                if (connected)
+                    search(cluster, i);
+            }
+        }
+
+        /**
+         * @param words Bit set words.
+         * @param bitIndex Bit index.
+         */
+        static void setBit(long words[], int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            words[wordIndex] |= (1L << bitIndex);
+        }
+
+        /**
+         * @param words Bit set words.
+         * @param bitIndex Bit index.
+         * @return Bit value.
+         */
+        static boolean getBit(long[] words, int bitIndex) {
+            int wordIndex = wordIndex(bitIndex);
+
+            return (words[wordIndex] & (1L << bitIndex)) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DefaultCommunicationFailureResolver.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index add3880..cc3ea10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -493,6 +493,9 @@ public class IgniteConfiguration {
     /** Failure handler. */
     private FailureHandler failureHnd;
 
+    /** Communication failure resolver */
+    private CommunicationFailureResolver commFailureRslvr;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -520,6 +523,8 @@ public class IgniteConfiguration {
         loadBalancingSpi = cfg.getLoadBalancingSpi();
         indexingSpi = cfg.getIndexingSpi();
 
+        commFailureRslvr = cfg.getCommunicationFailureResolver();
+
         /*
          * Order alphabetically for maintenance purposes.
          */
@@ -607,6 +612,23 @@ public class IgniteConfiguration {
     }
 
     /**
+     * @return Communication failure resovler.
+     */
+    public CommunicationFailureResolver getCommunicationFailureResolver() {
+        return commFailureRslvr;
+    }
+
+    /**
+     * @param commFailureRslvr Communication failure resovler.
+     * @return {@code this} instance.
+     */
+    public IgniteConfiguration 
setCommunicationFailureResolver(CommunicationFailureResolver commFailureRslvr) {
+        this.commFailureRslvr = commFailureRslvr;
+
+        return this;
+    }
+
+    /**
      * Gets optional grid name. Returns {@code null} if non-default grid name 
was not
      * provided.
      * <p>The name only works locally and has no effect on topology</p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 4932e67..1227e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -121,6 +121,9 @@ public enum GridTopic {
     TOPIC_WAL,
 
     /** */
+    TOPIC_METRICS,
+
+    /** */
     TOPIC_AUTH;
 
     /** Enum values. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8bc46fd..0b102e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1298,7 +1298,7 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         ackStart(rtBean);
 
         if (!isDaemon())
-            ctx.discovery().ackTopology(localNode().order());
+            
ctx.discovery().ackTopology(ctx.discovery().localJoin().joinTopologyVersion().topologyVersion());
     }
 
     /**
@@ -2623,6 +2623,9 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         objs.add(cfg.getGridLogger());
         objs.add(cfg.getMBeanServer());
 
+        if (cfg.getCommunicationFailureResolver() != null)
+            objs.add(cfg.getCommunicationFailureResolver());
+
         return objs;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 4708dd3..417ba1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -75,6 +75,7 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import 
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -2243,6 +2244,8 @@ public class IgnitionEx {
 
             initializeDefaultSpi(myCfg);
 
+            
GridDiscoveryManager.initCommunicationErrorResolveConfiguration(myCfg);
+
             initializeDefaultCacheConfiguration(myCfg);
 
             ExecutorConfiguration[] execCfgs = 
myCfg.getExecutorConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 74f5a10..b0756cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -618,6 +618,14 @@ public abstract class GridManagerAdapter<T extends 
IgniteSpi> implements GridMan
                         return ctx.nodeAttributes();
                     }
 
+                    @Override public boolean 
communicationFailureResolveSupported() {
+                        return 
ctx.discovery().communicationErrorResolveSupported();
+                    }
+
+                    @Override public void 
resolveCommunicationFailure(ClusterNode node, Exception err) {
+                        ctx.discovery().resolveCommunicationError(node, err);
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index d5cdd2d..8d9a700 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -298,9 +298,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
                 @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory)
                     throws IgniteCheckedException {
-                    assert rmtNodeId != null;
 
-                    return new DirectMessageReader(msgFactory, 
U.directProtocolVersion(ctx, rmtNodeId));
+                    return new DirectMessageReader(msgFactory,
+                        rmtNodeId != null ? U.directProtocolVersion(ctx, 
rmtNodeId) : GridIoManager.DIRECT_PROTO_VER);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index a0fc2f8..5616fd0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -123,6 +123,8 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import 
org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import 
org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
@@ -909,6 +911,16 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 133:
+                msg = new ClusterMetricsUpdateMessage();
+
+                break;
+
+            case 134:
+                msg = new ContinuousRoutineStartResultMessage();
+
+                break;
+
             // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
index 4268886..4b6b7a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java
@@ -49,6 +49,11 @@ public class CustomMessageWrapper implements 
DiscoverySpiCustomMessage {
         return delegate.isMutable();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return delegate.stopProcess();
+    }
+
     /**
      * @return Delegate.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index c21698f..fef44fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -312,6 +312,14 @@ public class DiscoCache {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @return {@code True} if node is in alives list.
+     */
+    public boolean alive(UUID nodeId) {
+        return alives.contains(nodeId);
+    }
+
+    /**
      * Gets all nodes that have cache with given name.
      *
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index c708c62..6ed2096 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery;
 import java.io.Serializable;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.jetbrains.annotations.Nullable;
@@ -87,11 +88,18 @@ public interface DiscoveryCustomMessage extends 
Serializable {
     @Nullable public DiscoveryCustomMessage ackMessage();
 
     /**
-     * @return {@code true} if message can be modified during listener 
notification. Changes will be send to next nodes.
+     * @return {@code True} if message can be modified during listener 
notification. Changes will be sent to next nodes.
      */
     public boolean isMutable();
 
     /**
+     * See {@link DiscoverySpiCustomMessage#stopProcess()}.
+     *
+     * @return {@code True} if message should not be sent to others nodes 
after it was processed on coordinator.
+     */
+    public boolean stopProcess();
+
+    /**
      * Creates new discovery cache if message caused topology version change.
      *
      * @param mgr Discovery manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
new file mode 100644
index 0000000..43be952
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class DiscoveryMessageResultsCollector<M, R>  {
+    /** */
+    private final Map<UUID, NodeMessage<M>> rcvd = new HashMap<>();
+
+    /** */
+    private int leftMsgs;
+
+    /** */
+    protected DiscoCache discoCache;
+
+    /** */
+    protected final GridKernalContext ctx;
+
+    /**
+     * @param ctx Context.
+     */
+    protected DiscoveryMessageResultsCollector(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
+     * @param rcvd Received messages.
+     * @return Result.
+     */
+    protected abstract R createResult(Map<UUID, NodeMessage<M>> rcvd);
+
+    /**
+     * @param r Result.
+     */
+    protected abstract void onResultsCollected(R r);
+
+    /**
+     * @param discoCache Discovery state when discovery message was received.
+     * @param node Node.
+     * @return {@code True} if need wait for result from given node.
+     */
+    protected abstract boolean waitForNode(DiscoCache discoCache, ClusterNode 
node);
+
+    /**
+     * @param discoCache Discovery state.
+     */
+    public final void init(DiscoCache discoCache) {
+        assert discoCache != null;
+
+        R res = null;
+
+        synchronized (this) {
+            assert this.discoCache == null;
+            assert leftMsgs == 0 : leftMsgs;
+
+            this.discoCache = discoCache;
+
+            for (ClusterNode node : discoCache.allNodes()) {
+                if (ctx.discovery().alive(node) && waitForNode(discoCache, 
node) && !rcvd.containsKey(node.id())) {
+                    rcvd.put(node.id(), new NodeMessage<>((M)null));
+
+                    leftMsgs++;
+                }
+            }
+
+            if (leftMsgs == 0)
+                res = createResult(rcvd);
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    public final void onMessage(UUID nodeId, M msg) {
+        R res = null;
+
+        synchronized (this) {
+            if (allReceived())
+                return;
+
+            NodeMessage<M> expMsg = rcvd.get(nodeId);
+
+            if (expMsg == null)
+                rcvd.put(nodeId, new NodeMessage<>(msg));
+            else if (expMsg.set(msg)) {
+                assert leftMsgs > 0;
+
+                leftMsgs--;
+
+                if (allReceived())
+                    res = createResult(rcvd);
+            }
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    public final void onNodeFail(UUID nodeId) {
+        R res = null;
+
+        synchronized (this) {
+            if (allReceived())
+                return;
+
+            NodeMessage expMsg = rcvd.get(nodeId);
+
+            if (expMsg != null && expMsg.onNodeFailed()) {
+                assert leftMsgs > 0 : leftMsgs;
+
+                leftMsgs--;
+
+                if (allReceived())
+                    res = createResult(rcvd);
+            }
+        }
+
+        if (res != null)
+            onResultsCollected(res);
+    }
+
+    /**
+     * @return {@code True} if expected messages are initialized and all 
message are received.
+     */
+    private boolean allReceived() {
+        return discoCache != null && leftMsgs == 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoveryMessageResultsCollector.class, this);
+    }
+
+    /**
+     *
+     */
+    protected static class NodeMessage<M> {
+        /** */
+        boolean nodeFailed;
+
+        /** */
+        M msg;
+
+        /**
+         * @param msg Message.
+         */
+        NodeMessage(M msg) {
+            this.msg = msg;
+        }
+
+        /**
+         * @return Message or {@code null} if node failed.
+         */
+        @Nullable public M message() {
+            return msg;
+        }
+
+        /**
+         * @return {@code True} if node result was not set before.
+         */
+        boolean onNodeFailed() {
+            if (nodeFailed || msg != null)
+                return false;
+
+            nodeFailed = true;
+
+            return true;
+        }
+
+        /**
+         * @param msg Received message.
+         * @return {@code True} if node result was not set before.
+         */
+        boolean set(M msg) {
+            assert msg != null;
+
+            if (this.msg != null)
+                return false;
+
+            this.msg = msg;
+
+            return !nodeFailed;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(NodeMessage.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 2e814d4..4c5690e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -54,8 +54,11 @@ import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationFailureResolver;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DefaultCommunicationFailureResolver;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.failure.FailureContext;
@@ -112,6 +115,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityCredentials;
 import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import 
org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
@@ -120,10 +125,10 @@ import 
org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
 import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
 import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -478,7 +483,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
-        if (Boolean.TRUE.equals(ctx.config().isClientMode()) && 
!getSpi().isClientMode())
+        if ((getSpi() instanceof TcpDiscoverySpi) && 
Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode())
             ctx.performance().add("Enable client mode for TcpDiscoverySpi " +
                 "(set TcpDiscoverySpi.forceServerMode to false)");
     }
@@ -551,6 +556,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             });
         }
 
+        if (ctx.config().getCommunicationFailureResolver() != null)
+            
ctx.resource().injectGeneric(ctx.config().getCommunicationFailureResolver());
+
         spi.setListener(new DiscoverySpiListener() {
             private long gridStartTime;
 
@@ -559,8 +567,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 for (IgniteInClosure<ClusterNode> lsnr : locNodeInitLsnrs)
                     lsnr.apply(locNode);
 
-                if (locNode instanceof TcpDiscoveryNode) {
-                    final TcpDiscoveryNode node = (TcpDiscoveryNode)locNode;
+                if (locNode instanceof IgniteClusterNode) {
+                    final IgniteClusterNode node = (IgniteClusterNode)locNode;
 
                     if (consistentId != null)
                         node.setConsistentId(consistentId);
@@ -1052,7 +1060,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /**
      * @return Metrics provider.
      */
-    private DiscoveryMetricsProvider createMetricsProvider() {
+    public DiscoveryMetricsProvider createMetricsProvider() {
         return new DiscoveryMetricsProvider() {
             /** */
             private final long startTime = U.currentTimeMillis();
@@ -1679,13 +1687,15 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             return getSpi().pingNode(nodeId);
         }
         catch (IgniteException e) {
-            if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) {
+            if (e.hasCause(IgniteClientDisconnectedCheckedException.class, 
IgniteClientDisconnectedException.class)) {
                 IgniteFuture<?> reconnectFut = 
ctx.cluster().clientReconnectFuture();
 
                 throw new 
IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage());
             }
 
-            throw e;
+            LT.warn(log, "Ping failed with error [node=" + nodeId + ", err=" + 
e + ']');
+
+            return true;
         }
         finally {
             busyLock.leaveBusy();
@@ -2025,7 +2035,16 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
         Map<Long, Collection<ClusterNode>> snapshots = topHist;
 
-        return snapshots.get(topVer);
+        Collection<ClusterNode> nodes = snapshots.get(topVer);
+
+        if (nodes == null) {
+            DiscoCache cache = discoCacheHist.get(new 
AffinityTopologyVersion(topVer, 0));
+
+            if (cache != null)
+                nodes = cache.allNodes();
+        }
+
+        return nodes;
     }
 
     /**
@@ -2158,6 +2177,19 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param discoCache
+     * @param node
+     */
+    public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) {
+        discoWrk.addEvent(EVT_NODE_METRICS_UPDATED,
+            discoCache.version(),
+            node,
+            discoCache,
+            discoCache.nodeMap.values(),
+            null);
+    }
+
+    /**
      * Gets first grid node start time, see {@link 
DiscoverySpi#getGridStartTime()}.
      *
      * @return Start time of the first grid node.
@@ -2211,8 +2243,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     public boolean reconnectSupported() {
         DiscoverySpi spi = getSpi();
 
-        return ctx.discovery().localNode().isClient() && (spi instanceof 
TcpDiscoverySpi) &&
-            !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+        return ctx.discovery().localNode().isClient() &&
+            (spi instanceof IgniteDiscoverySpi) &&
+            ((IgniteDiscoverySpi)spi).clientReconnectSupported();
     }
 
     /**
@@ -2225,7 +2258,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
         DiscoverySpi discoverySpi = getSpi();
 
-        ((TcpDiscoverySpi)discoverySpi).reconnect();
+        ((IgniteDiscoverySpi)discoverySpi).clientReconnect();
     }
 
     /**
@@ -2379,6 +2412,76 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         cacheNodes.add(rich);
     }
 
+    /**
+     * @param cfg Configuration.
+     * @throws IgniteCheckedException If configuration is not valid.
+     */
+    public static void 
initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws 
IgniteCheckedException {
+        CommunicationFailureResolver rslvr = 
cfg.getCommunicationFailureResolver();
+        CommunicationSpi commSpi = cfg.getCommunicationSpi();
+        DiscoverySpi discoverySpi = cfg.getDiscoverySpi();
+
+        if (rslvr != null) {
+            if (!supportsCommunicationErrorResolve(commSpi))
+                throw new IgniteCheckedException("CommunicationFailureResolver 
is configured, but CommunicationSpi does not support communication" +
+                    "problem resolve: " + commSpi.getClass().getName());
+
+            if (!supportsCommunicationErrorResolve(discoverySpi))
+                throw new IgniteCheckedException("CommunicationFailureResolver 
is configured, but DiscoverySpi does not support communication" +
+                    "problem resolve: " + discoverySpi.getClass().getName());
+        }
+        else {
+            if (supportsCommunicationErrorResolve(commSpi) && 
supportsCommunicationErrorResolve(discoverySpi))
+                cfg.setCommunicationFailureResolver(new 
DefaultCommunicationFailureResolver());
+        }
+    }
+
+    /**
+     * @param spi Discovery SPI.
+     * @return {@code True} if SPI supports communication error resolve.
+     */
+    private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) 
{
+        return spi instanceof IgniteDiscoverySpi && 
((IgniteDiscoverySpi)spi).supportsCommunicationFailureResolve();
+    }
+
+    /**
+     * @param spi Discovery SPI.
+     * @return {@code True} if SPI supports communication error resolve.
+     */
+    private static boolean supportsCommunicationErrorResolve(CommunicationSpi 
spi) {
+        return spi instanceof TcpCommunicationSpi;
+    }
+
+    /**
+     * @return {@code True} if communication error resolve is supported.
+     */
+    public boolean communicationErrorResolveSupported() {
+        return ctx.config().getCommunicationFailureResolver() != null;
+    }
+
+    /**
+     * @return {@code True} if configured {@link DiscoverySpi} supports 
mutable custom messages.
+     */
+    public boolean mutableCustomMessages() {
+        DiscoverySpiMutableCustomMessageSupport ann = 
U.getAnnotation(ctx.config().getDiscoverySpi().getClass(),
+            DiscoverySpiMutableCustomMessageSupport.class);
+
+        return ann != null && ann.value();
+    }
+
+    /**
+     * @param node Problem node.
+     * @param err Error.
+     */
+    public void resolveCommunicationError(ClusterNode node, Exception err) {
+        DiscoverySpi spi = getSpi();
+
+        if (!supportsCommunicationErrorResolve(spi) || 
!supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi()))
+            throw new UnsupportedOperationException();
+
+        ((IgniteDiscoverySpi)spi).resolveCommunicationFailure(node, err);
+    }
+
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */
@@ -2587,6 +2690,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
             AffinityTopologyVersion topVer = evt.get2();
 
+            if (type == EVT_NODE_METRICS_UPDATED && 
topVer.compareTo(discoCache.version()) < 0)
+                return;
+
             ClusterNode node = evt.get3();
 
             boolean isDaemon = node.isDaemon();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java
new file mode 100644
index 0000000..cbc706a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public interface IgniteClusterNode extends ClusterNode {
+    /**
+     * Sets consistent globally unique node ID which survives node restarts.
+     *
+     * @param consistentId Consistent globally unique node ID.
+     */
+    public void setConsistentId(Serializable consistentId);
+
+    /**
+     * Sets node metrics.
+     *
+     * @param metrics Node metrics.
+     */
+    public void setMetrics(ClusterMetrics metrics);
+
+    /**
+     * Gets collections of cache metrics for this node. Note that node cache 
metrics are constantly updated
+     * and provide up to date information about caches.
+     * <p>
+     * Cache metrics are updated with some delay which is directly related to 
metrics update
+     * frequency. For example, by default the update will happen every {@code 
2} seconds.
+     *
+     * @return Runtime metrics snapshots for this node.
+     */
+    public Map<Integer, CacheMetrics> cacheMetrics();
+
+    /**
+     * Sets node cache metrics.
+     *
+     * @param cacheMetrics Cache metrics.
+     */
+    public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics);
+
+    /**
+     * Whether this node is cache client (see {@link 
IgniteConfiguration#isClientMode()}).
+     *
+     * @return {@code True if client}.
+     */
+    public boolean isCacheClient();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
new file mode 100644
index 0000000..9aa5d14
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+
+/**
+ *
+ */
+public interface IgniteDiscoverySpi extends DiscoverySpi {
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if node joining or already joined topology.
+     */
+    public boolean knownNode(UUID nodeId);
+
+    /**
+     *
+     * @return {@code True} if SPI supports client reconnect.
+     */
+    public boolean clientReconnectSupported();
+
+    /**
+     *
+     */
+    public void clientReconnect();
+
+    /**
+     * For TESTING only.
+     */
+    public void simulateNodeFailure();
+
+    /**
+     * For TESTING only.
+     *
+     * @param lsnr Listener.
+     */
+    public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr);
+
+    /**
+     * @return {@code True} if supports communication error resolve.
+     */
+    public boolean supportsCommunicationFailureResolve();
+
+    /**
+     * @param node Problem node.
+     * @param err Connection error.
+     */
+    public void resolveCommunicationFailure(ClusterNode node, Exception err);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
new file mode 100644
index 0000000..24405f8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+
+/**
+ * For TESTING only.
+ */
+public interface IgniteDiscoverySpiInternalListener {
+    /**
+     * @param locNode Local node.
+     * @param log Log.
+     */
+    public void beforeJoin(ClusterNode locNode, IgniteLogger log);
+
+    /**
+     * @param spi SPI instance.
+     * @param log Logger.
+     * @param msg Custom message.
+     * @return {@code False} to cancel event send.
+     */
+    public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, 
DiscoverySpiCustomMessage msg);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java
index ef87a44..2e2aed9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java
@@ -72,6 +72,11 @@ public class UserAcceptedMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java
index 1a0be8e..19f9e82 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java
@@ -72,6 +72,11 @@ public class UserProposedMessage implements 
DiscoveryServerOnlyCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
index fe1014c..937a889 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -156,6 +156,11 @@ public class CacheAffinityChangeMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         return discoCache.copy(topVer, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 6691b13..92b8d3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1310,20 +1310,17 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param fut Current exchange future.
      * @param msg Message finish message.
      * @param resTopVer Result topology version.
-     * @throws IgniteCheckedException If failed.
      */
     public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut,
         GridDhtPartitionsFullMessage msg,
-        final AffinityTopologyVersion resTopVer)
-        throws IgniteCheckedException {
+        final AffinityTopologyVersion resTopVer) {
         final Set<Integer> affReq = 
fut.context().groupsAffinityRequestOnJoin();
 
         final Map<Long, ClusterNode> nodesByOrder = new HashMap<>();
 
         final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = 
msg.joinedNodeAffinity();
 
-        assert !F.isEmpty(joinedNodeAff) : msg;
-        assert joinedNodeAff.size() >= affReq.size();
+        assert F.isEmpty(affReq) || (!F.isEmpty(joinedNodeAff) && 
joinedNodeAff.size() >= affReq.size()) : msg;
 
         forAllCacheGroups(false, new 
IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) 
throws IgniteCheckedException {
@@ -1333,7 +1330,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 assert grp != null;
 
-                if (affReq.contains(aff.groupId())) {
+                if (affReq != null && affReq.contains(aff.groupId())) {
                     assert 
AffinityTopologyVersion.NONE.equals(aff.lastVersion());
 
                     CacheGroupAffinityMessage affMsg = 
joinedNodeAff.get(aff.groupId());
@@ -2282,6 +2279,23 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
+     * @return All registered cache groups.
+     */
+    public Map<Integer, DynamicCacheDescriptor> caches() {
+        return caches.registeredCaches;
+    }
+
+    /**
+     * @param grpId Cache group ID
+     * @return Cache affinity cache.
+     */
+    @Nullable public GridAffinityAssignmentCache groupAffinity(int grpId) {
+        CacheGroupHolder grpHolder = grpHolders.get(grpId);
+
+        return grpHolder != null ? grpHolder.affinity() : null;
+    }
+
+    /**
      *
      */
     public void dumpDebugInfo() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
index 40bcfaf..e33256f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java
@@ -101,6 +101,11 @@ public class CacheStatisticsModeChangeMessage implements 
DiscoveryCustomMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
index e35d80e..ae76c95 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -173,6 +173,11 @@ public class ClientCacheChangeDiscoveryMessage implements 
DiscoveryCustomMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
index 6ed3ecc..4ce0c87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -105,6 +105,11 @@ public class ClientCacheChangeDummyDiscoveryMessage 
implements DiscoveryCustomMe
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 83459a5..d85e29b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -77,6 +77,11 @@ public class DynamicCacheChangeBatch implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
         return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 55357ff..c2d0f42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
@@ -3234,7 +3235,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
 
         for (ClusterNode node : grp.nodes()) {
-            Map<Integer, CacheMetrics> nodeCacheMetrics = 
((TcpDiscoveryNode)node).cacheMetrics();
+            Map<Integer, CacheMetrics> nodeCacheMetrics = 
((IgniteClusterNode)node).cacheMetrics();
 
             if (nodeCacheMetrics != null) {
                 CacheMetrics e = nodeCacheMetrics.get(context().cacheId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a30a24a..77ffce3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -427,7 +427,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) {
             assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > 
loc.order() :
                 "Node joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
+                    "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ", evt=" + evt + ']';
 
             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 
@@ -570,12 +570,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         for (ClusterNode n : cctx.discovery().remoteNodes())
             cctx.versions().onReceived(n.id(), 
n.metrics().getLastDataVersion());
 
-        ClusterNode loc = cctx.localNode();
-
-        long startTime = loc.metrics().getStartTime();
-
-        assert startTime > 0;
-
         DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin();
 
         GridDhtPartitionsExchangeFuture fut = null;
@@ -758,6 +752,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /**
      * @param grpId Cache group ID.
+     * @return Topology.
+     */
+    @Nullable public GridDhtPartitionTopology clientTopologyIfExists(int 
grpId) {
+        return clientTops.get(grpId);
+    }
+
+    /**
+     * @param grpId Cache group ID.
      * @param discoCache Discovery data cache.
      * @return Topology.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a3f7c94..7edac73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3363,7 +3363,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @return Validation result or {@code null} in case of success.
      */
     @Nullable private IgniteNodeValidationResult 
validateHashIdResolvers(ClusterNode node) {
-        if (!node.isClient()) {
+        if (!CU.clientNode(node)) {
             for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
                 CacheConfiguration cfg = desc.cacheConfiguration();
 
@@ -3372,7 +3372,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                     Object nodeHashObj = aff.resolveNodeHash(node);
 
-                    for (ClusterNode topNode : ctx.discovery().allNodes()) {
+                    for (ClusterNode topNode : 
ctx.discovery().aliveServerNodes()) {
                         Object topNodeHashObj = aff.resolveNodeHash(topNode);
 
                         if (nodeHashObj.hashCode() == 
topNodeHashObj.hashCode()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 83ce2ba..a5169d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -97,7 +98,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.plugin.CachePluginConfiguration;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -1348,8 +1348,8 @@ public class GridCacheUtils {
      * @return {@code True} if given node is client node (has flag {@link 
IgniteConfiguration#isClientMode()} set).
      */
     public static boolean clientNode(ClusterNode node) {
-        if (node instanceof TcpDiscoveryNode)
-            return ((TcpDiscoveryNode)node).isCacheClient();
+        if (node instanceof IgniteClusterNode)
+            return ((IgniteClusterNode)node).isCacheClient();
         else
             return clientNodeDirect(node);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java
index 57f25d0..4afa403 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java
@@ -67,6 +67,11 @@ public class WalStateFinishMessage extends 
WalStateAbstractMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(WalStateFinishMessage.class, this, "super", 
super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
index 747fd6a..b9d96fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
@@ -98,6 +98,11 @@ public class WalStateProposeMessage extends 
WalStateAbstractMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(WalStateProposeMessage.class, this, "super", 
super.toString());
     }

Reply via email to