IGNITE-7222 Added ZooKeeper discovery SPI
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a64b941d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a64b941d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a64b941d Branch: refs/heads/ignite-6083 Commit: a64b941df0a4d3bfc3a2dab32f85c371c1a509be Parents: a0a187b Author: Semyon Boikov <[email protected]> Authored: Tue Apr 10 11:37:39 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Apr 10 11:57:40 2018 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 6 +- .../CommunicationFailureContext.java | 62 + .../CommunicationFailureResolver.java | 28 + .../DefaultCommunicationFailureResolver.java | 305 ++ .../configuration/IgniteConfiguration.java | 22 + .../org/apache/ignite/internal/GridTopic.java | 3 + .../apache/ignite/internal/IgniteKernal.java | 5 +- .../org/apache/ignite/internal/IgnitionEx.java | 3 + .../internal/managers/GridManagerAdapter.java | 8 + .../managers/communication/GridIoManager.java | 4 +- .../communication/GridIoMessageFactory.java | 12 + .../discovery/CustomMessageWrapper.java | 5 + .../internal/managers/discovery/DiscoCache.java | 8 + .../discovery/DiscoveryCustomMessage.java | 10 +- .../DiscoveryMessageResultsCollector.java | 222 + .../discovery/GridDiscoveryManager.java | 128 +- .../managers/discovery/IgniteClusterNode.java | 69 + .../managers/discovery/IgniteDiscoverySpi.java | 67 + .../IgniteDiscoverySpiInternalListener.java | 42 + .../authentication/UserAcceptedMessage.java | 5 + .../authentication/UserProposedMessage.java | 5 + .../cache/CacheAffinityChangeMessage.java | 5 + .../cache/CacheAffinitySharedManager.java | 26 +- .../cache/CacheStatisticsModeChangeMessage.java | 5 + .../ClientCacheChangeDiscoveryMessage.java | 5 + .../ClientCacheChangeDummyDiscoveryMessage.java | 5 + .../cache/DynamicCacheChangeBatch.java | 5 + .../processors/cache/GridCacheAdapter.java | 3 +- .../GridCachePartitionExchangeManager.java | 16 +- .../processors/cache/GridCacheProcessor.java | 4 +- .../processors/cache/GridCacheUtils.java | 6 +- .../processors/cache/WalStateFinishMessage.java | 5 + .../cache/WalStateProposeMessage.java | 5 + .../cache/binary/BinaryMetadataTransport.java | 24 +- .../binary/MetadataUpdateAcceptedMessage.java | 5 + .../binary/MetadataUpdateProposedMessage.java | 5 + .../dht/GridClientPartitionTopology.java | 39 + .../distributed/dht/GridDhtCacheAdapter.java | 4 + .../dht/GridDhtPartitionTopology.java | 6 + .../dht/GridDhtPartitionTopologyImpl.java | 39 + .../GridDhtPartitionsExchangeFuture.java | 26 +- .../cluster/ChangeGlobalStateFinishMessage.java | 5 + .../cluster/ChangeGlobalStateMessage.java | 5 + .../cluster/ClusterMetricsUpdateMessage.java | 158 + .../processors/cluster/ClusterNodeMetrics.java | 62 + .../processors/cluster/ClusterProcessor.java | 249 +- .../continuous/AbstractContinuousMessage.java | 5 + .../continuous/ContinuousRoutineInfo.java | 100 + .../ContinuousRoutineStartResultMessage.java | 206 + .../ContinuousRoutinesCommonDiscoveryData.java | 45 + .../continuous/ContinuousRoutinesInfo.java | 132 + ...tinuousRoutinesJoiningNodeDiscoveryData.java | 45 + .../continuous/GridContinuousProcessor.java | 862 +++- .../continuous/StartRequestDataV2.java | 164 + .../StartRoutineDiscoveryMessageV2.java | 77 + .../StopRoutineAckDiscoveryMessage.java | 5 + .../datastreamer/DataStreamerImpl.java | 27 +- .../marshaller/MappingAcceptedMessage.java | 5 + .../marshaller/MappingProposedMessage.java | 5 + .../message/SchemaFinishDiscoveryMessage.java | 5 + .../message/SchemaProposeDiscoveryMessage.java | 5 + .../ignite/internal/util/nio/GridNioServer.java | 18 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 10 + .../org/apache/ignite/spi/IgniteSpiContext.java | 11 + .../communication/tcp/TcpCommunicationSpi.java | 190 +- .../tcp/internal/ConnectionKey.java | 117 + .../TcpCommunicationConnectionCheckFuture.java | 519 ++ ...pCommunicationNodeConnectionCheckFuture.java | 30 + .../discovery/DiscoverySpiCustomMessage.java | 15 +- ...DiscoverySpiMutableCustomMessageSupport.java | 40 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 55 +- .../tcp/internal/TcpDiscoveryNode.java | 32 +- .../resources/META-INF/classnames.properties | 2 + ...unctionExcludeNeighborsAbstractSelfTest.java | 8 +- .../failure/FailureHandlerTriggeredTest.java | 4 + .../internal/ClusterGroupHostsSelfTest.java | 3 + .../ignite/internal/ClusterGroupSelfTest.java | 2 + .../internal/ClusterNodeMetricsUpdateTest.java | 173 + .../internal/DiscoverySpiTestListener.java | 162 + .../ignite/internal/GridDiscoverySelfTest.java | 14 +- .../GridJobMasterLeaveAwareSelfTest.java | 2 + .../internal/GridJobStealingSelfTest.java | 2 + .../internal/GridSameVmStartupSelfTest.java | 19 +- .../apache/ignite/internal/GridSelfTest.java | 2 + .../IgniteClientReconnectAbstractTest.java | 53 +- .../IgniteClientReconnectApiExceptionTest.java | 21 +- .../IgniteClientReconnectAtomicsTest.java | 30 +- .../IgniteClientReconnectCacheTest.java | 49 +- .../IgniteClientReconnectCollectionsTest.java | 14 +- .../IgniteClientReconnectComputeTest.java | 6 +- ...eClientReconnectContinuousProcessorTest.java | 13 +- ...IgniteClientReconnectDiscoveryStateTest.java | 22 +- ...niteClientReconnectFailoverAbstractTest.java | 12 +- .../IgniteClientReconnectServicesTest.java | 8 +- .../internal/IgniteClientReconnectStopTest.java | 12 +- .../IgniteClientReconnectStreamerTest.java | 4 +- .../ignite/internal/IgniteClientRejoinTest.java | 3 + .../GridDiscoveryManagerAliveCacheSelfTest.java | 16 +- .../GridAffinityProcessorAbstractSelfTest.java | 4 +- .../CacheMetricsForClusterGroupSelfTest.java | 12 +- .../cache/GridCacheAbstractSelfTest.java | 2 + .../cache/IgniteCacheNearLockValueSelfTest.java | 4 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 11 + .../IgniteClusterActivateDeactivateTest.java | 65 + .../IgniteDaemonNodeMarshallerCacheTest.java | 3 +- .../binary/BinaryMetadataUpdatesFlowTest.java | 12 +- ...ntNodeBinaryObjectMetadataMultinodeTest.java | 2 +- .../GridCacheQueueClientDisconnectTest.java | 10 + .../IgniteClientDataStructuresAbstractTest.java | 3 +- .../CacheLateAffinityAssignmentTest.java | 127 +- .../GridCacheNodeFailureAbstractTest.java | 5 +- .../distributed/IgniteCache150ClientsTest.java | 2 + .../distributed/IgniteCacheManyClientsTest.java | 44 +- .../IgniteOptimisticTxSuspendResumeTest.java | 2 + ...ridCacheDhtPreloadMultiThreadedSelfTest.java | 4 + .../dht/GridCacheDhtPreloadSelfTest.java | 2 + .../dht/TxRecoveryStoreEnabledTest.java | 15 +- ...titionedExplicitLockNodeFailureSelfTest.java | 3 +- .../ClientReconnectContinuousQueryTest.java | 19 +- ...yRemoteFilterMissingInClassPathSelfTest.java | 23 +- ...CacheContinuousQueryClientReconnectTest.java | 3 + .../CacheVersionedEntryAbstractTest.java | 33 +- .../continuous/GridEventConsumeSelfTest.java | 34 +- .../service/ClosureServiceClientsNodesTest.java | 19 +- .../internal/util/GridTestClockTimer.java | 9 + .../GridMarshallerMappingConsistencyTest.java | 4 + .../ignite/messaging/GridMessagingSelfTest.java | 126 +- .../GridTcpCommunicationSpiAbstractTest.java | 71 + .../FilterDataForClientNodeDiscoveryTest.java | 5 + .../testframework/GridSpiTestContext.java | 10 + .../config/GridTestProperties.java | 9 + .../testframework/junits/GridAbstractTest.java | 129 +- .../junits/multijvm/IgniteNodeRunner.java | 2 + .../testsuites/IgniteComputeGridTestSuite.java | 2 + ...niteCacheDistributedQueryCancelSelfTest.java | 2 +- .../DynamicIndexAbstractBasicSelfTest.java | 5 +- .../GridJtaTransactionManagerSelfTest.java | 21 +- .../GridPartitionedCacheJtaFactorySelfTest.java | 19 +- .../org/apache/ignite/spark/IgniteRDD.scala | 9 +- .../ignite/internal/GridFactorySelfTest.java | 3 +- .../p2p/GridP2PUserVersionChangeSelfTest.java | 5 +- modules/yardstick/pom-standalone.xml | 6 + modules/yardstick/pom.xml | 6 + modules/zookeeper/pom.xml | 40 + .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 557 ++ .../zk/internal/ZkAbstractCallabck.java | 83 + .../zk/internal/ZkAbstractChildrenCallback.java | 61 + .../zk/internal/ZkAbstractWatcher.java | 55 + .../discovery/zk/internal/ZkAliveNodeData.java | 40 + .../zk/internal/ZkBulkJoinContext.java | 50 + .../discovery/zk/internal/ZkClusterNodes.java | 103 + .../internal/ZkCommunicationErrorNodeState.java | 46 + .../ZkCommunicationErrorProcessFuture.java | 411 ++ ...kCommunicationErrorResolveFinishMessage.java | 69 + .../ZkCommunicationErrorResolveResult.java | 45 + ...ZkCommunicationErrorResolveStartMessage.java | 61 + .../internal/ZkCommunicationFailureContext.java | 188 + .../zk/internal/ZkDiscoveryCustomEventData.java | 89 + .../zk/internal/ZkDiscoveryEventData.java | 165 + .../zk/internal/ZkDiscoveryEventsData.java | 121 + .../internal/ZkDiscoveryNodeFailEventData.java | 55 + .../internal/ZkDiscoveryNodeJoinEventData.java | 60 + .../ZkDistributedCollectDataFuture.java | 250 + .../zk/internal/ZkForceNodeFailMessage.java | 65 + .../discovery/zk/internal/ZkIgnitePaths.java | 307 ++ .../zk/internal/ZkInternalJoinErrorMessage.java | 44 + .../zk/internal/ZkInternalMessage.java | 27 + .../zk/internal/ZkJoinEventDataForJoined.java | 83 + .../zk/internal/ZkJoinedNodeEvtData.java | 79 + .../zk/internal/ZkJoiningNodeData.java | 87 + .../zk/internal/ZkNoServersMessage.java | 50 + .../zk/internal/ZkNodeValidateResult.java | 43 + .../spi/discovery/zk/internal/ZkRunnable.java | 51 + .../discovery/zk/internal/ZkRuntimeState.java | 135 + .../discovery/zk/internal/ZkTimeoutObject.java | 54 + .../discovery/zk/internal/ZookeeperClient.java | 1219 +++++ .../ZookeeperClientFailedException.java | 40 + .../zk/internal/ZookeeperClusterNode.java | 362 ++ .../zk/internal/ZookeeperDiscoveryImpl.java | 4464 ++++++++++++++++ .../java/org/apache/ZookeeperNodeStart.java | 46 + ...CacheEntryListenerWithZkDiscoAtomicTest.java | 32 + .../ZookeeperDiscoverySpiAbstractTestSuite.java | 118 + .../zk/ZookeeperDiscoverySpiTestSuite1.java | 44 + .../zk/ZookeeperDiscoverySpiTestSuite2.java | 94 + ...ZookeeperDiscoverySuitePreprocessorTest.java | 101 + .../zk/internal/ZookeeperClientTest.java | 495 ++ ...okeeperDiscoverySpiSaslAuthAbstractTest.java | 247 + ...ZookeeperDiscoverySpiSaslFailedAuthTest.java | 44 + ...eeperDiscoverySpiSaslSuccessfulAuthTest.java | 48 + .../zk/internal/ZookeeperDiscoverySpiTest.java | 4847 ++++++++++++++++++ .../zookeeper/ZkTestClientCnxnSocketNIO.java | 137 + 191 files changed, 21158 insertions(+), 777 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java index f4c0ca3..0a055a9 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java @@ -138,8 +138,10 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac @Override protected void afterTest() throws Exception { ((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, true, true, false); - conn.close(); - assertTrue(conn.isClosed()); + if (conn != null) { + conn.close(); + assertTrue(conn.isClosed()); + } cleanUpWorkingDir(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java new file mode 100644 index 0000000..a32d38c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.communication.CommunicationSpi; + +/** + * Communication Failure Context. + */ +public interface CommunicationFailureContext { + /** + * @return Current topology snapshot. + */ + public List<ClusterNode> topologySnapshot(); + + /** + * @param node1 First node. + * @param node2 Second node. + * @return {@code True} if {@link CommunicationSpi} is able to establish connection from first node to second node. + */ + public boolean connectionAvailable(ClusterNode node1, ClusterNode node2); + + /** + * @return Currently started caches. + */ + public Map<String, CacheConfiguration<?, ?>> startedCaches(); + + /** + * @param cacheName Cache name. + * @return Cache partitions affinity assignment. + */ + public List<List<ClusterNode>> cacheAffinity(String cacheName); + + /** + * @param cacheName Cache name. + * @return Cache partitions owners. + */ + public List<List<ClusterNode>> cachePartitionOwners(String cacheName); + + /** + * @param node Node to kill. + */ + public void killNode(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java new file mode 100644 index 0000000..a4d92f3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CommunicationFailureResolver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +/** + * Communication Failure Resolver. + */ +public interface CommunicationFailureResolver { + /** + * @param ctx Context. + */ + public void resolve(CommunicationFailureContext ctx); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java new file mode 100644 index 0000000..a4c6da9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationFailureResolver.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import java.util.BitSet; +import java.util.List; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.LoggerResource; + +/** + * Default Communication Failure Resolver. + */ +public class DefaultCommunicationFailureResolver implements CommunicationFailureResolver { + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void resolve(CommunicationFailureContext ctx) { + ClusterGraph graph = new ClusterGraph(log, ctx); + + ClusterSearch cluster = graph.findLargestIndependentCluster(); + + List<ClusterNode> nodes = ctx.topologySnapshot(); + + assert nodes.size() > 0; + assert cluster != null; + + if (graph.checkFullyConnected(cluster.nodesBitSet)) { + assert cluster.nodeCnt <= nodes.size(); + + if (cluster.nodeCnt < nodes.size()) { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found fully connected independent cluster [" + + "clusterSrvCnt=" + cluster.srvCnt + + ", clusterTotalNodes=" + cluster.nodeCnt + + ", totalAliveNodes=" + nodes.size() + "]"); + } + + for (int i = 0; i < nodes.size(); i++) { + if (!cluster.nodesBitSet.get(i)) + ctx.killNode(nodes.get(i)); + } + } + else + U.warn(log, "All alive nodes are fully connected, this should be resolved automatically."); + } + else { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver failed to find fully connected independent cluster."); + } + } + } + + /** + * @param cluster Cluster nodes mask. + * @param nodes Nodes. + * @param limit IDs limit. + * @return Cluster node IDs string. + */ + private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) { + int startIdx = 0; + + StringBuilder builder = new StringBuilder(); + + int cnt = 0; + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + startIdx = idx + 1; + + if (builder.length() == 0) { + builder.append('['); + } + else + builder.append(", "); + + builder.append(nodes.get(idx).id()); + + if (cnt++ > limit) + builder.append(", ..."); + } + + builder.append(']'); + + return builder.toString(); + } + + /** + * + */ + private static class ClusterSearch { + /** */ + int srvCnt; + + /** */ + int nodeCnt; + + /** */ + final BitSet nodesBitSet; + + /** + * @param nodes Total nodes. + */ + ClusterSearch(int nodes) { + nodesBitSet = new BitSet(nodes); + } + } + + /** + * + */ + private static class ClusterGraph { + /** */ + private final static int WORD_IDX_SHIFT = 6; + + /** */ + private final IgniteLogger log; + + /** */ + private final int nodeCnt; + + /** */ + private final long[] visitBitSet; + + /** */ + private final CommunicationFailureContext ctx; + + /** */ + private final List<ClusterNode> nodes; + + /** + * @param log Logger. + * @param ctx Context. + */ + ClusterGraph(IgniteLogger log, CommunicationFailureContext ctx) { + this.log = log; + this.ctx = ctx; + + nodes = ctx.topologySnapshot(); + + nodeCnt = nodes.size(); + + assert nodeCnt > 0; + + visitBitSet = initBitSet(nodeCnt); + } + + /** + * @param bitIndex Bit index. + * @return Word index containing bit with given index. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> WORD_IDX_SHIFT; + } + + /** + * @param bitCnt Number of bits. + * @return Bit set words. + */ + static long[] initBitSet(int bitCnt) { + return new long[wordIndex(bitCnt - 1) + 1]; + } + + /** + * @return Cluster nodes bit set. + */ + ClusterSearch findLargestIndependentCluster() { + ClusterSearch maxCluster = null; + + for (int i = 0; i < nodeCnt; i++) { + if (getBit(visitBitSet, i)) + continue; + + ClusterSearch cluster = new ClusterSearch(nodeCnt); + + search(cluster, i); + + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt + + ", totalNodeCnt=" + cluster.nodeCnt + + ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) + "]"); + } + + if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt) + maxCluster = cluster; + } + + return maxCluster; + } + + /** + * @param cluster Cluster nodes bit set. + * @return {@code True} if all cluster nodes are able to connect to each other. + */ + boolean checkFullyConnected(BitSet cluster) { + int startIdx = 0; + + int clusterNodes = cluster.cardinality(); + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + ClusterNode node1 = nodes.get(idx); + + for (int i = 0; i < clusterNodes; i++) { + if (!cluster.get(i) || i == idx) + continue; + + ClusterNode node2 = nodes.get(i); + + if (cluster.get(i) && !ctx.connectionAvailable(node1, node2)) + return false; + } + + startIdx = idx + 1; + } + + return true; + } + + /** + * @param cluster Current cluster bit set. + * @param idx Node index. + */ + void search(ClusterSearch cluster, int idx) { + assert !getBit(visitBitSet, idx); + + setBit(visitBitSet, idx); + + cluster.nodesBitSet.set(idx); + cluster.nodeCnt++; + + ClusterNode node1 = nodes.get(idx); + + if (!CU.clientNode(node1)) + cluster.srvCnt++; + + for (int i = 0; i < nodeCnt; i++) { + if (i == idx || getBit(visitBitSet, i)) + continue; + + ClusterNode node2 = nodes.get(i); + + boolean connected = ctx.connectionAvailable(node1, node2) || + ctx.connectionAvailable(node2, node1); + + if (connected) + search(cluster, i); + } + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + */ + static void setBit(long words[], int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + words[wordIndex] |= (1L << bitIndex); + } + + /** + * @param words Bit set words. + * @param bitIndex Bit index. + * @return Bit value. + */ + static boolean getBit(long[] words, int bitIndex) { + int wordIndex = wordIndex(bitIndex); + + return (words[wordIndex] & (1L << bitIndex)) != 0; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DefaultCommunicationFailureResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index add3880..cc3ea10 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -493,6 +493,9 @@ public class IgniteConfiguration { /** Failure handler. */ private FailureHandler failureHnd; + /** Communication failure resolver */ + private CommunicationFailureResolver commFailureRslvr; + /** * Creates valid grid configuration with all default values. */ @@ -520,6 +523,8 @@ public class IgniteConfiguration { loadBalancingSpi = cfg.getLoadBalancingSpi(); indexingSpi = cfg.getIndexingSpi(); + commFailureRslvr = cfg.getCommunicationFailureResolver(); + /* * Order alphabetically for maintenance purposes. */ @@ -607,6 +612,23 @@ public class IgniteConfiguration { } /** + * @return Communication failure resovler. + */ + public CommunicationFailureResolver getCommunicationFailureResolver() { + return commFailureRslvr; + } + + /** + * @param commFailureRslvr Communication failure resovler. + * @return {@code this} instance. + */ + public IgniteConfiguration setCommunicationFailureResolver(CommunicationFailureResolver commFailureRslvr) { + this.commFailureRslvr = commFailureRslvr; + + return this; + } + + /** * Gets optional grid name. Returns {@code null} if non-default grid name was not * provided. * <p>The name only works locally and has no effect on topology</p> http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 4932e67..1227e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -121,6 +121,9 @@ public enum GridTopic { TOPIC_WAL, /** */ + TOPIC_METRICS, + + /** */ TOPIC_AUTH; /** Enum values. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8bc46fd..0b102e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1298,7 +1298,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ackStart(rtBean); if (!isDaemon()) - ctx.discovery().ackTopology(localNode().order()); + ctx.discovery().ackTopology(ctx.discovery().localJoin().joinTopologyVersion().topologyVersion()); } /** @@ -2623,6 +2623,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { objs.add(cfg.getGridLogger()); objs.add(cfg.getMBeanServer()); + if (cfg.getCommunicationFailureResolver() != null) + objs.add(cfg.getCommunicationFailureResolver()); + return objs; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 4708dd3..417ba1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -75,6 +75,7 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.igfs.IgfsThreadFactory; import org.apache.ignite.internal.processors.igfs.IgfsUtils; @@ -2243,6 +2244,8 @@ public class IgnitionEx { initializeDefaultSpi(myCfg); + GridDiscoveryManager.initCommunicationErrorResolveConfiguration(myCfg); + initializeDefaultCacheConfiguration(myCfg); ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 74f5a10..b0756cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -618,6 +618,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.nodeAttributes(); } + @Override public boolean communicationFailureResolveSupported() { + return ctx.discovery().communicationErrorResolveSupported(); + } + + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + ctx.discovery().resolveCommunicationError(node, err); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index d5cdd2d..8d9a700 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -298,9 +298,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException { - assert rmtNodeId != null; - return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId)); + return new DirectMessageReader(msgFactory, + rmtNodeId != null ? U.directProtocolVersion(ctx, rmtNodeId) : GridIoManager.DIRECT_PROTO_VER); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index a0fc2f8..5616fd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -123,6 +123,8 @@ import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; +import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; +import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; @@ -909,6 +911,16 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 133: + msg = new ClusterMetricsUpdateMessage(); + + break; + + case 134: + msg = new ContinuousRoutineStartResultMessage(); + + break; + // [-3..119] [124..129] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java index 4268886..4b6b7a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -49,6 +49,11 @@ public class CustomMessageWrapper implements DiscoverySpiCustomMessage { return delegate.isMutable(); } + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return delegate.stopProcess(); + } + /** * @return Delegate. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index c21698f..fef44fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -312,6 +312,14 @@ public class DiscoCache { } /** + * @param nodeId Node ID. + * @return {@code True} if node is in alives list. + */ + public boolean alive(UUID nodeId) { + return alives.contains(nodeId); + } + + /** * Gets all nodes that have cache with given name. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java index c708c62..6ed2096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.discovery; import java.io.Serializable; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.jetbrains.annotations.Nullable; @@ -87,11 +88,18 @@ public interface DiscoveryCustomMessage extends Serializable { @Nullable public DiscoveryCustomMessage ackMessage(); /** - * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + * @return {@code True} if message can be modified during listener notification. Changes will be sent to next nodes. */ public boolean isMutable(); /** + * See {@link DiscoverySpiCustomMessage#stopProcess()}. + * + * @return {@code True} if message should not be sent to others nodes after it was processed on coordinator. + */ + public boolean stopProcess(); + + /** * Creates new discovery cache if message caused topology version change. * * @param mgr Discovery manager. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java new file mode 100644 index 0000000..43be952 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageResultsCollector.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class DiscoveryMessageResultsCollector<M, R> { + /** */ + private final Map<UUID, NodeMessage<M>> rcvd = new HashMap<>(); + + /** */ + private int leftMsgs; + + /** */ + protected DiscoCache discoCache; + + /** */ + protected final GridKernalContext ctx; + + /** + * @param ctx Context. + */ + protected DiscoveryMessageResultsCollector(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** + * @param rcvd Received messages. + * @return Result. + */ + protected abstract R createResult(Map<UUID, NodeMessage<M>> rcvd); + + /** + * @param r Result. + */ + protected abstract void onResultsCollected(R r); + + /** + * @param discoCache Discovery state when discovery message was received. + * @param node Node. + * @return {@code True} if need wait for result from given node. + */ + protected abstract boolean waitForNode(DiscoCache discoCache, ClusterNode node); + + /** + * @param discoCache Discovery state. + */ + public final void init(DiscoCache discoCache) { + assert discoCache != null; + + R res = null; + + synchronized (this) { + assert this.discoCache == null; + assert leftMsgs == 0 : leftMsgs; + + this.discoCache = discoCache; + + for (ClusterNode node : discoCache.allNodes()) { + if (ctx.discovery().alive(node) && waitForNode(discoCache, node) && !rcvd.containsKey(node.id())) { + rcvd.put(node.id(), new NodeMessage<>((M)null)); + + leftMsgs++; + } + } + + if (leftMsgs == 0) + res = createResult(rcvd); + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + public final void onMessage(UUID nodeId, M msg) { + R res = null; + + synchronized (this) { + if (allReceived()) + return; + + NodeMessage<M> expMsg = rcvd.get(nodeId); + + if (expMsg == null) + rcvd.put(nodeId, new NodeMessage<>(msg)); + else if (expMsg.set(msg)) { + assert leftMsgs > 0; + + leftMsgs--; + + if (allReceived()) + res = createResult(rcvd); + } + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @param nodeId Failed node ID. + */ + public final void onNodeFail(UUID nodeId) { + R res = null; + + synchronized (this) { + if (allReceived()) + return; + + NodeMessage expMsg = rcvd.get(nodeId); + + if (expMsg != null && expMsg.onNodeFailed()) { + assert leftMsgs > 0 : leftMsgs; + + leftMsgs--; + + if (allReceived()) + res = createResult(rcvd); + } + } + + if (res != null) + onResultsCollected(res); + } + + /** + * @return {@code True} if expected messages are initialized and all message are received. + */ + private boolean allReceived() { + return discoCache != null && leftMsgs == 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryMessageResultsCollector.class, this); + } + + /** + * + */ + protected static class NodeMessage<M> { + /** */ + boolean nodeFailed; + + /** */ + M msg; + + /** + * @param msg Message. + */ + NodeMessage(M msg) { + this.msg = msg; + } + + /** + * @return Message or {@code null} if node failed. + */ + @Nullable public M message() { + return msg; + } + + /** + * @return {@code True} if node result was not set before. + */ + boolean onNodeFailed() { + if (nodeFailed || msg != null) + return false; + + nodeFailed = true; + + return true; + } + + /** + * @param msg Received message. + * @return {@code True} if node result was not set before. + */ + boolean set(M msg) { + assert msg != null; + + if (this.msg != null) + return false; + + this.msg = msg; + + return !nodeFailed; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeMessage.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 2e814d4..4c5690e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -54,8 +54,11 @@ import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CommunicationFailureResolver; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DefaultCommunicationFailureResolver; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.failure.FailureContext; @@ -112,6 +115,8 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.plugin.segmentation.SegmentationPolicy; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -120,10 +125,10 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -478,7 +483,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { - if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode()) + if ((getSpi() instanceof TcpDiscoverySpi) && Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode()) ctx.performance().add("Enable client mode for TcpDiscoverySpi " + "(set TcpDiscoverySpi.forceServerMode to false)"); } @@ -551,6 +556,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { }); } + if (ctx.config().getCommunicationFailureResolver() != null) + ctx.resource().injectGeneric(ctx.config().getCommunicationFailureResolver()); + spi.setListener(new DiscoverySpiListener() { private long gridStartTime; @@ -559,8 +567,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { for (IgniteInClosure<ClusterNode> lsnr : locNodeInitLsnrs) lsnr.apply(locNode); - if (locNode instanceof TcpDiscoveryNode) { - final TcpDiscoveryNode node = (TcpDiscoveryNode)locNode; + if (locNode instanceof IgniteClusterNode) { + final IgniteClusterNode node = (IgniteClusterNode)locNode; if (consistentId != null) node.setConsistentId(consistentId); @@ -1052,7 +1060,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @return Metrics provider. */ - private DiscoveryMetricsProvider createMetricsProvider() { + public DiscoveryMetricsProvider createMetricsProvider() { return new DiscoveryMetricsProvider() { /** */ private final long startTime = U.currentTimeMillis(); @@ -1679,13 +1687,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return getSpi().pingNode(nodeId); } catch (IgniteException e) { - if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) { + if (e.hasCause(IgniteClientDisconnectedCheckedException.class, IgniteClientDisconnectedException.class)) { IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture(); throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage()); } - throw e; + LT.warn(log, "Ping failed with error [node=" + nodeId + ", err=" + e + ']'); + + return true; } finally { busyLock.leaveBusy(); @@ -2025,7 +2035,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Long, Collection<ClusterNode>> snapshots = topHist; - return snapshots.get(topVer); + Collection<ClusterNode> nodes = snapshots.get(topVer); + + if (nodes == null) { + DiscoCache cache = discoCacheHist.get(new AffinityTopologyVersion(topVer, 0)); + + if (cache != null) + nodes = cache.allNodes(); + } + + return nodes; } /** @@ -2158,6 +2177,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param discoCache + * @param node + */ + public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) { + discoWrk.addEvent(EVT_NODE_METRICS_UPDATED, + discoCache.version(), + node, + discoCache, + discoCache.nodeMap.values(), + null); + } + + /** * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}. * * @return Start time of the first grid node. @@ -2211,8 +2243,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { public boolean reconnectSupported() { DiscoverySpi spi = getSpi(); - return ctx.discovery().localNode().isClient() && (spi instanceof TcpDiscoverySpi) && - !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + return ctx.discovery().localNode().isClient() && + (spi instanceof IgniteDiscoverySpi) && + ((IgniteDiscoverySpi)spi).clientReconnectSupported(); } /** @@ -2225,7 +2258,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { DiscoverySpi discoverySpi = getSpi(); - ((TcpDiscoverySpi)discoverySpi).reconnect(); + ((IgniteDiscoverySpi)discoverySpi).clientReconnect(); } /** @@ -2379,6 +2412,76 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { cacheNodes.add(rich); } + /** + * @param cfg Configuration. + * @throws IgniteCheckedException If configuration is not valid. + */ + public static void initCommunicationErrorResolveConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { + CommunicationFailureResolver rslvr = cfg.getCommunicationFailureResolver(); + CommunicationSpi commSpi = cfg.getCommunicationSpi(); + DiscoverySpi discoverySpi = cfg.getDiscoverySpi(); + + if (rslvr != null) { + if (!supportsCommunicationErrorResolve(commSpi)) + throw new IgniteCheckedException("CommunicationFailureResolver is configured, but CommunicationSpi does not support communication" + + "problem resolve: " + commSpi.getClass().getName()); + + if (!supportsCommunicationErrorResolve(discoverySpi)) + throw new IgniteCheckedException("CommunicationFailureResolver is configured, but DiscoverySpi does not support communication" + + "problem resolve: " + discoverySpi.getClass().getName()); + } + else { + if (supportsCommunicationErrorResolve(commSpi) && supportsCommunicationErrorResolve(discoverySpi)) + cfg.setCommunicationFailureResolver(new DefaultCommunicationFailureResolver()); + } + } + + /** + * @param spi Discovery SPI. + * @return {@code True} if SPI supports communication error resolve. + */ + private static boolean supportsCommunicationErrorResolve(DiscoverySpi spi) { + return spi instanceof IgniteDiscoverySpi && ((IgniteDiscoverySpi)spi).supportsCommunicationFailureResolve(); + } + + /** + * @param spi Discovery SPI. + * @return {@code True} if SPI supports communication error resolve. + */ + private static boolean supportsCommunicationErrorResolve(CommunicationSpi spi) { + return spi instanceof TcpCommunicationSpi; + } + + /** + * @return {@code True} if communication error resolve is supported. + */ + public boolean communicationErrorResolveSupported() { + return ctx.config().getCommunicationFailureResolver() != null; + } + + /** + * @return {@code True} if configured {@link DiscoverySpi} supports mutable custom messages. + */ + public boolean mutableCustomMessages() { + DiscoverySpiMutableCustomMessageSupport ann = U.getAnnotation(ctx.config().getDiscoverySpi().getClass(), + DiscoverySpiMutableCustomMessageSupport.class); + + return ann != null && ann.value(); + } + + /** + * @param node Problem node. + * @param err Error. + */ + public void resolveCommunicationError(ClusterNode node, Exception err) { + DiscoverySpi spi = getSpi(); + + if (!supportsCommunicationErrorResolve(spi) || !supportsCommunicationErrorResolve(ctx.config().getCommunicationSpi())) + throw new UnsupportedOperationException(); + + ((IgniteDiscoverySpi)spi).resolveCommunicationFailure(node, err); + } + /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ @@ -2587,6 +2690,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer = evt.get2(); + if (type == EVT_NODE_METRICS_UPDATED && topVer.compareTo(discoCache.version()) < 0) + return; + ClusterNode node = evt.get3(); boolean isDaemon = node.isDaemon(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java new file mode 100644 index 0000000..cbc706a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteClusterNode.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public interface IgniteClusterNode extends ClusterNode { + /** + * Sets consistent globally unique node ID which survives node restarts. + * + * @param consistentId Consistent globally unique node ID. + */ + public void setConsistentId(Serializable consistentId); + + /** + * Sets node metrics. + * + * @param metrics Node metrics. + */ + public void setMetrics(ClusterMetrics metrics); + + /** + * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated + * and provide up to date information about caches. + * <p> + * Cache metrics are updated with some delay which is directly related to metrics update + * frequency. For example, by default the update will happen every {@code 2} seconds. + * + * @return Runtime metrics snapshots for this node. + */ + public Map<Integer, CacheMetrics> cacheMetrics(); + + /** + * Sets node cache metrics. + * + * @param cacheMetrics Cache metrics. + */ + public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics); + + /** + * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}). + * + * @return {@code True if client}. + */ + public boolean isCacheClient(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java new file mode 100644 index 0000000..9aa5d14 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.discovery.DiscoverySpi; + +/** + * + */ +public interface IgniteDiscoverySpi extends DiscoverySpi { + /** + * @param nodeId Node ID. + * @return {@code True} if node joining or already joined topology. + */ + public boolean knownNode(UUID nodeId); + + /** + * + * @return {@code True} if SPI supports client reconnect. + */ + public boolean clientReconnectSupported(); + + /** + * + */ + public void clientReconnect(); + + /** + * For TESTING only. + */ + public void simulateNodeFailure(); + + /** + * For TESTING only. + * + * @param lsnr Listener. + */ + public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr); + + /** + * @return {@code True} if supports communication error resolve. + */ + public boolean supportsCommunicationFailureResolve(); + + /** + * @param node Problem node. + * @param err Connection error. + */ + public void resolveCommunicationFailure(ClusterNode node, Exception err); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java new file mode 100644 index 0000000..24405f8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; + +/** + * For TESTING only. + */ +public interface IgniteDiscoverySpiInternalListener { + /** + * @param locNode Local node. + * @param log Log. + */ + public void beforeJoin(ClusterNode locNode, IgniteLogger log); + + /** + * @param spi SPI instance. + * @param log Logger. + * @param msg Custom message. + * @return {@code False} to cancel event send. + */ + public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java index ef87a44..2e2aed9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserAcceptedMessage.java @@ -72,6 +72,11 @@ public class UserAcceptedMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java index 1a0be8e..19f9e82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserProposedMessage.java @@ -72,6 +72,11 @@ public class UserProposedMessage implements DiscoveryServerOnlyCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java index fe1014c..937a889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java @@ -156,6 +156,11 @@ public class CacheAffinityChangeMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { return discoCache.copy(topVer, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 6691b13..92b8d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1310,20 +1310,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param fut Current exchange future. * @param msg Message finish message. * @param resTopVer Result topology version. - * @throws IgniteCheckedException If failed. */ public void onLocalJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg, - final AffinityTopologyVersion resTopVer) - throws IgniteCheckedException { + final AffinityTopologyVersion resTopVer) { final Set<Integer> affReq = fut.context().groupsAffinityRequestOnJoin(); final Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); final Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = msg.joinedNodeAffinity(); - assert !F.isEmpty(joinedNodeAff) : msg; - assert joinedNodeAff.size() >= affReq.size(); + assert F.isEmpty(affReq) || (!F.isEmpty(joinedNodeAff) && joinedNodeAff.size() >= affReq.size()) : msg; forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { @@ -1333,7 +1330,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert grp != null; - if (affReq.contains(aff.groupId())) { + if (affReq != null && affReq.contains(aff.groupId())) { assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()); CacheGroupAffinityMessage affMsg = joinedNodeAff.get(aff.groupId()); @@ -2282,6 +2279,23 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @return All registered cache groups. + */ + public Map<Integer, DynamicCacheDescriptor> caches() { + return caches.registeredCaches; + } + + /** + * @param grpId Cache group ID + * @return Cache affinity cache. + */ + @Nullable public GridAffinityAssignmentCache groupAffinity(int grpId) { + CacheGroupHolder grpHolder = grpHolders.get(grpId); + + return grpHolder != null ? grpHolder.affinity() : null; + } + + /** * */ public void dumpDebugInfo() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java index 40bcfaf..e33256f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java @@ -101,6 +101,11 @@ public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java index e35d80e..ae76c95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java @@ -173,6 +173,11 @@ public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java index 6ed3ecc..4ce0c87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java @@ -105,6 +105,11 @@ public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMe } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 83459a5..d85e29b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -77,6 +77,11 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { return mgr.createDiscoCacheOnCacheChange(topVer, discoCache); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 55357ff..c2d0f42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -82,6 +82,7 @@ import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; @@ -3234,7 +3235,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); for (ClusterNode node : grp.nodes()) { - Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics(); + Map<Integer, CacheMetrics> nodeCacheMetrics = ((IgniteClusterNode)node).cacheMetrics(); if (nodeCacheMetrics != null) { CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a30a24a..77ffce3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -427,7 +427,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (evt.type() != EVT_DISCOVERY_CUSTOM_EVT) { assert evt.type() != EVT_NODE_JOINED || n.isLocal() || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ", evt=" + evt + ']'; exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); @@ -570,12 +570,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (ClusterNode n : cctx.discovery().remoteNodes()) cctx.versions().onReceived(n.id(), n.metrics().getLastDataVersion()); - ClusterNode loc = cctx.localNode(); - - long startTime = loc.metrics().getStartTime(); - - assert startTime > 0; - DiscoveryLocalJoinData locJoin = cctx.discovery().localJoin(); GridDhtPartitionsExchangeFuture fut = null; @@ -758,6 +752,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param grpId Cache group ID. + * @return Topology. + */ + @Nullable public GridDhtPartitionTopology clientTopologyIfExists(int grpId) { + return clientTops.get(grpId); + } + + /** + * @param grpId Cache group ID. * @param discoCache Discovery data cache. * @return Topology. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a3f7c94..7edac73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -3363,7 +3363,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Validation result or {@code null} in case of success. */ @Nullable private IgniteNodeValidationResult validateHashIdResolvers(ClusterNode node) { - if (!node.isClient()) { + if (!CU.clientNode(node)) { for (DynamicCacheDescriptor desc : cacheDescriptors().values()) { CacheConfiguration cfg = desc.cacheConfiguration(); @@ -3372,7 +3372,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { Object nodeHashObj = aff.resolveNodeHash(node); - for (ClusterNode topNode : ctx.discovery().allNodes()) { + for (ClusterNode topNode : ctx.discovery().aliveServerNodes()) { Object topNodeHashObj = aff.resolveNodeHash(topNode); if (nodeHashObj.hashCode() == topNodeHashObj.hashCode()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 83ce2ba..a5169d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -97,7 +98,6 @@ import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.plugin.CachePluginConfiguration; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -1348,8 +1348,8 @@ public class GridCacheUtils { * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). */ public static boolean clientNode(ClusterNode node) { - if (node instanceof TcpDiscoveryNode) - return ((TcpDiscoveryNode)node).isCacheClient(); + if (node instanceof IgniteClusterNode) + return ((IgniteClusterNode)node).isCacheClient(); else return clientNodeDirect(node); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java index 57f25d0..4afa403 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateFinishMessage.java @@ -67,6 +67,11 @@ public class WalStateFinishMessage extends WalStateAbstractMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(WalStateFinishMessage.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java index 747fd6a..b9d96fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java @@ -98,6 +98,11 @@ public class WalStateProposeMessage extends WalStateAbstractMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(WalStateProposeMessage.class, this, "super", super.toString()); }
