Repository: ignite Updated Branches: refs/heads/ignite-zk 942c70f16 -> f0fa6cfaf
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0fa6cfa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0fa6cfa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0fa6cfa Branch: refs/heads/ignite-zk Commit: f0fa6cfaf0f0e96c8c61d3e8de0a6c3e89dae988 Parents: 942c70f Author: sboikov <[email protected]> Authored: Fri Dec 1 16:13:25 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 1 16:51:55 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 11 +++- .../managers/discovery/IgniteDiscoverySpi.java | 23 ++++++++ .../IgniteDiscoverySpiInternalListener.java | 30 ++++++++++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 15 ++++- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 11 ++++ .../zk/internal/ZookeeperDiscoveryImpl.java | 20 +++++++ .../internal/DiscoverySpiBlockJoinListener.java | 62 ++++++++++++++++++++ .../IgniteClientReconnectAbstractTest.java | 9 +++ .../IgniteClientReconnectApiExceptionTest.java | 3 +- .../IgniteClientReconnectCacheTest.java | 18 ++++-- ...IgniteClientReconnectDiscoveryStateTest.java | 22 ++++--- .../dht/TxRecoveryStoreEnabledTest.java | 15 +---- .../ZookeeperDiscoverySpiBasicTest.java | 30 ---------- .../testframework/junits/GridAbstractTest.java | 29 +++++++++ 14 files changed, 239 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a6cc06d..b36a607 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1990,7 +1990,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Long, Collection<ClusterNode>> snapshots = topHist; - return snapshots.get(topVer); + Collection<ClusterNode> nodes = snapshots.get(topVer); + + if (nodes == null) { + DiscoCache cache = discoCacheHist.get(new AffinityTopologyVersion(topVer, 0)); + + if (cache != null) + nodes = cache.allNodes(); + } + + return nodes; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java index 7418352..2752210 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java @@ -24,9 +24,32 @@ import org.apache.ignite.spi.discovery.DiscoverySpi; * TODO ZK */ public interface IgniteDiscoverySpi extends DiscoverySpi { + /** + * @param nodeId Node ID. + * @return {@code True} if node joining or already joined topology. + */ public boolean knownNode(UUID nodeId); + /** + * + * @return + */ public boolean reconnectSupported(); + /** + * + */ public void reconnect(); + + /** + * + */ + public void simulateNodeFailure(); + + /** + * For TESTING only. + * + * @param lsnr Listener. + */ + public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java new file mode 100644 index 0000000..b655681 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.IgniteLogger; + +/** + * + */ +public interface IgniteDiscoverySpiInternalListener { + /** + * @param log Log. + */ + void beforeJoin(IgniteLogger log); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0335885..404868e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -56,6 +56,7 @@ import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -103,6 +104,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessag import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT; @@ -403,6 +405,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery /** */ protected IgniteSpiContext spiCtx; + /** */ + private IgniteDiscoverySpiInternalListener internalLsnr; + /** * Gets current SPI state. * @@ -1517,6 +1522,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage) + internalLsnr.beforeJoin(log); + assert sock != null; assert msg != null; assert out != null; @@ -2113,6 +2121,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery sndMsgLsnrs.add(lsnr); } + /** {@inheritDoc} */ + @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) { + this.internalLsnr = lsnr; + } + /** * <strong>FOR TEST ONLY!!!</strong> */ @@ -2150,7 +2163,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery * <p> * This method is intended for test purposes only. */ - protected void simulateNodeFailure() { + public void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 08f0b26..8fa8f96 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -300,6 +301,16 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } /** {@inheritDoc} */ + @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) { + impl.internalLsnr = lsnr; + } + + /** {@inheritDoc} */ + @Override public void simulateNodeFailure() { + impl.simulateNodeFailure(); + } + + /** {@inheritDoc} */ @Override protected void onContextDestroyed0() { if (impl != null) impl.onStop(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 1d398ed..44b5ae2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.U; @@ -138,6 +139,9 @@ public class ZookeeperDiscoveryImpl { /** */ private final Object stateMux = new Object(); + /** */ + public volatile IgniteDiscoverySpiInternalListener internalLsnr; + /** * @param log Logger. * @param zkRootPath Zookeeper base path node all nodes. @@ -466,6 +470,11 @@ public class ZookeeperDiscoveryImpl { * @throws InterruptedException If interrupted. */ private void joinTopology0(boolean prevJoined) throws InterruptedException { + IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; + + if (internalLsnr != null) + internalLsnr.beforeJoin(log); + state = new ZkRuntimeState(prevJoined); DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id()); @@ -1369,6 +1378,17 @@ public class ZookeeperDiscoveryImpl { } /** + * + */ + public void simulateNodeFailure() { + zkClient().deleteIfExistsAsync(zkPaths.aliveNodesDir); + + zkClient().onCloseStart(); + + zkClient().close(); + } + + /** * @param evtData Event data. * @param msg Custom message. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java new file mode 100644 index 0000000..f8a387e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public class DiscoverySpiBlockJoinListener implements IgniteDiscoverySpiInternalListener { + /** */ + private volatile CountDownLatch writeLatch; + + /** + * + */ + void startBlock() { + writeLatch = new CountDownLatch(1); + } + + /** + * + */ + void stopBlock() { + writeLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void beforeJoin(IgniteLogger log) { + try { + CountDownLatch writeLatch0 = writeLatch; + + if (writeLatch0 != null) { + log.info("Block join"); + + U.await(writeLatch0); + } + } + catch (Exception e) { + throw new IgniteException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index c9a931e..998118c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -143,6 +144,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra /** * @param ignite Node. + * @return Discovery SPI. + */ + protected static IgniteDiscoverySpi spi0(Ignite ignite) { + return ((IgniteDiscoverySpi)ignite.configuration().getDiscoverySpi()); + } + + /** + * @param ignite Node. * @return Communication SPI. */ protected BlockTcpCommunicationSpi commSpi(Ignite ignite) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java index 07b655d..52c313f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import static java.util.concurrent.TimeUnit.SECONDS; @@ -772,7 +773,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = spi(srv); final CountDownLatch disconnectLatch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 1c10bf1..b9848a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; @@ -67,6 +68,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; @@ -155,11 +157,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteEx client = startGrid(SRV_CNT); - final TestTcpDiscoverySpi clientSpi = spi(client); + final IgniteDiscoverySpi clientSpi = spi0(client); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi(); final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); @@ -188,7 +190,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Block reconnect."); - clientSpi.writeLatch = new CountDownLatch(1); + DiscoverySpiBlockJoinListener lsnr = new DiscoverySpiBlockJoinListener(); + + clientSpi.setInternalListener(lsnr); + + lsnr.startBlock(); final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>(); @@ -254,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Allow reconnect."); - clientSpi.writeLatch.countDown(); + lsnr.stopBlock(); assertTrue(reconnectLatch.await(5000, MILLISECONDS)); @@ -319,7 +325,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteEx client = startGrid(SRV_CNT); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java index c071ee2..6e77742 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.DiscoverySpi; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; @@ -64,20 +65,23 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne nodeCnt.put(1, 1); nodeCnt.put(2, 2); nodeCnt.put(3, 3); - nodeCnt.put(4, 4); - for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { - Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + if (tcpDiscovery()) { + nodeCnt.put(4, 4); - assertNotNull("No nodes for topology: " + e.getKey(), nodes); - assertEquals((int)e.getValue(), nodes.size()); + for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { + Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + + assertNotNull("No nodes for topology: " + e.getKey(), nodes); + assertEquals((int)e.getValue(), nodes.size()); + } } ClusterNode locNode = cluster.localNode(); assertEquals(topVer, locNode.order()); - TestTcpDiscoverySpi srvSpi = spi(clientRouter(client)); + DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi(); final CountDownLatch reconnectLatch = new CountDownLatch(1); @@ -112,7 +116,11 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne assertEquals(topVer, locNode.order()); assertEquals(topVer, cluster.topologyVersion()); - nodeCnt.put(5, 3); + if (tcpDiscovery()) + nodeCnt.put(5, 3); + else + nodeCnt.clear(); + nodeCnt.put(6, 4); for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java index 7b350c8..060af21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/TxRecoveryStoreEnabledTest.java @@ -34,11 +34,11 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -65,7 +65,6 @@ public class TxRecoveryStoreEnabledTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setCommunicationSpi(new TestCommunicationSpi()); - cfg.setDiscoverySpi(new TestDiscoverySpi()); CacheConfiguration ccfg = defaultCacheConfiguration(); @@ -126,7 +125,7 @@ public class TxRecoveryStoreEnabledTest extends GridCommonAbstractTest { IgniteConfiguration cfg = node0.configuration(); ((TestCommunicationSpi)cfg.getCommunicationSpi()).block(); - ((TestDiscoverySpi)cfg.getDiscoverySpi()).simulateNodeFailure(); + ((IgniteDiscoverySpi)cfg.getDiscoverySpi()).simulateNodeFailure(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -201,16 +200,6 @@ public class TxRecoveryStoreEnabledTest extends GridCommonAbstractTest { /** * */ - private static class TestDiscoverySpi extends TcpDiscoverySpi { - /** {@inheritDoc} */ - @Override protected void simulateNodeFailure() { - super.simulateNodeFailure(); - } - } - - /** - * - */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** Block. */ private volatile boolean block; http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 8eaff07..980acc3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1716,36 +1716,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param expSize Expected nodes number. - * @throws Exception If failed. - */ - private void waitForTopology(final int expSize) throws Exception { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - List<Ignite> nodes = G.allGrids(); - - if (nodes.size() != expSize) { - info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); - - return false; - } - - for (Ignite node: nodes) { - int sizeOnNode = node.cluster().nodes().size(); - - if (sizeOnNode != expSize) { - info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); - - return false; - } - } - - return true; - } - }, 15_000)); - } - - /** * Reconnect client node. * * @param log Logger. http://git-wip-us.apache.org/repos/asf/ignite/blob/f0fa6cfa/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index bff099d..3f44657 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -2159,6 +2159,35 @@ public abstract class GridAbstractTest extends TestCase { } } } + /** + * @param expSize Expected nodes number. + * @throws Exception If failed. + */ + protected final void waitForTopology(final int expSize) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + List<Ignite> nodes = G.allGrids(); + + if (nodes.size() != expSize) { + info("Wait all nodes [size=" + nodes.size() + ", exp=" + expSize + ']'); + + return false; + } + + for (Ignite node: nodes) { + int sizeOnNode = node.cluster().nodes().size(); + + if (sizeOnNode != expSize) { + info("Wait for size on node [node=" + node.name() + ", size=" + sizeOnNode + ", exp=" + expSize + ']'); + + return false; + } + } + + return true; + } + }, 15_000)); + } /** * @param millis Time to sleep.
