ignite-1.5 Fixed TcpDiscoveryMulticastIpFinder to request address on each getRegisteredAddresses call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/717dab25 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/717dab25 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/717dab25 Branch: refs/heads/ignite-2100 Commit: 717dab259e3f0287046ffcefa28cf9214ab65ff7 Parents: 6c61598 Author: sboikov <[email protected]> Authored: Mon Dec 14 10:00:57 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 14 10:00:57 2015 +0300 ---------------------------------------------------------------------- .../TcpDiscoveryMulticastIpFinder.java | 106 +++++++++++++------ .../tcp/TcpClientDiscoverySpiMulticastTest.java | 91 +++++++++++++++- .../TcpDiscoveryIpFinderAbstractSelfTest.java | 2 +- .../TcpDiscoveryMulticastIpFinderSelfTest.java | 16 ++- 4 files changed, 174 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index d19d08b..77bb99d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -122,6 +122,20 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { @GridToStringExclude private Collection<AddressSender> addrSnds; + /** */ + @GridToStringExclude + private InetAddress mcastAddr; + + /** */ + @GridToStringExclude + private Set<InetAddress> reqItfs; + + /** */ + private boolean firstReq; + + /** */ + private boolean mcastErr; + /** * Constructs new IP finder. */ @@ -300,8 +314,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { boolean clientMode = discoveryClientMode(); - InetAddress mcastAddr; - try { mcastAddr = InetAddress.getByName(mcastGrp); } @@ -325,7 +337,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { addrSnds = new ArrayList<>(locAddrs.size()); - Set<InetAddress> reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests. + reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests. for (String locAddr : locAddrs) { InetAddress addr; @@ -356,8 +368,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { } } - boolean mcastErr = false; - if (!clientMode) { if (addrSnds.isEmpty()) { try { @@ -395,11 +405,62 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { } else assert addrSnds.isEmpty() : addrSnds; + } + + /** {@inheritDoc} */ + @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { + super.onSpiContextInitialized(spiCtx); + + spiCtx.registerPort(mcastPort, UDP); + } + + /** {@inheritDoc} */ + @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() { + if (mcastAddr != null && reqItfs != null) { + Collection<InetSocketAddress> ret; + + if (reqItfs.size() > 1) + ret = requestAddresses(reqItfs); + else { + T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs)); - Collection<InetSocketAddress> ret; + ret = res.get1(); + + mcastErr |= res.get2(); + } + if (ret.isEmpty()) { + if (mcastErr && firstReq) { + if (getRegisteredAddresses().isEmpty()) { + InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT); + + U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " + + "will use default address: " + addr); + + registerAddresses(Collections.singleton(addr)); + } + else + U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " + + "will use pre-configured addresses."); + } + } + else + registerAddresses(ret); + + firstReq = false; + } + + return super.getRegisteredAddresses(); + } + + + /** + * @param reqItfs Interfaces used to send requests. + * @return Addresses. + */ + private Collection<InetSocketAddress> requestAddresses(Set<InetAddress> reqItfs) { if (reqItfs.size() > 1) { - ret = new HashSet<>(); + Collection<InetSocketAddress> ret = new HashSet<>(); Collection<AddressReceiver> rcvrs = new ArrayList<>(); @@ -425,39 +486,14 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { break; } } + + return ret; } else { T2<Collection<InetSocketAddress>, Boolean> res = requestAddresses(mcastAddr, F.first(reqItfs)); - ret = res.get1(); - - mcastErr |= res.get2(); + return res.get1(); } - - if (ret.isEmpty()) { - if (mcastErr) { - if (getRegisteredAddresses().isEmpty()) { - InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT); - - U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " + - "will use default address: " + addr); - - registerAddresses(Collections.singleton(addr)); - } - else - U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " + - "will use pre-configured addresses."); - } - } - else - registerAddresses(ret); - } - - /** {@inheritDoc} */ - @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { - super.onSpiContextInitialized(spiCtx); - - spiCtx.registerPort(mcastPort, UDP); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java index 79fd954..6611e00 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java @@ -18,13 +18,23 @@ package org.apache.ignite.spi.discovery.tcp; import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; + /** * */ @@ -32,6 +42,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { /** */ private boolean forceSrv; + /** */ + private ThreadLocal<Boolean> client = new ThreadLocal<>(); + + /** */ + private ThreadLocal<Integer> discoPort = new ThreadLocal<>(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -42,11 +58,23 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { spi.setIpFinder(new TcpDiscoveryMulticastIpFinder()); - if (getTestGridName(1).equals(gridName)) { + Boolean clientFlag = client.get(); + + client.set(null); + + if (clientFlag != null && clientFlag) { cfg.setClientMode(true); spi.setForceServerMode(forceSrv); } + else { + Integer port = discoPort.get(); + + discoPort.set(null); + + if (port != null) + spi.setLocalPort(port); + } cfg.setDiscoverySpi(spi); @@ -59,6 +87,61 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { stopAllGrids(); } + + /** + * @throws Exception If failed. + */ + public void testClientStartsFirst() throws Exception { + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override public Ignite call() throws Exception { + client.set(true); + + return startGrid(0); + } + }, "start-client"); + + U.sleep(10_000); + + discoPort.set(TcpDiscoverySpi.DFLT_PORT); + + Ignite srv = startGrid(1); + + Ignite client = fut.get(); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Client event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + assertEquals(1, reconnectLatch.getCount()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + assertEquals(0, disconnectLatch.getCount()); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srv.close(); + + assertTrue(disconnectLatch.await(10, SECONDS)); + + discoPort.set(TcpDiscoverySpi.DFLT_PORT + 100); + + startGrid(1); + + assertTrue(reconnectLatch.await(10, SECONDS)); + } + /** * @throws Exception If failed. */ @@ -83,8 +166,12 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { assertSpi(ignite0, false); + client.set(true); + Ignite ignite1 = startGrid(1); + assertTrue(ignite1.configuration().isClientMode()); + assertSpi(ignite1, !forceSrv); assertTrue(ignite1.configuration().isClientMode()); @@ -92,6 +179,8 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { assertEquals(2, ignite0.cluster().nodes().size()); assertEquals(2, ignite1.cluster().nodes().size()); + client.set(false); + Ignite ignite2 = startGrid(2); assertSpi(ignite2, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java index 03df43c..06aadda 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAbstractSelfTest.java @@ -81,7 +81,7 @@ public abstract class TcpDiscoveryIpFinderAbstractSelfTest<T extends TcpDiscover for (InetSocketAddress addr : initAddrs) assert addrs.contains(addr) : "Address is missing (got inconsistent addrs collection): " + addr; - finder.unregisterAddresses(Collections.singletonList(node1)); + finder.unregisterAddresses(Collections.singletonList(node2)); addrs = finder.getRegisteredAddresses(); http://git-wip-us.apache.org/repos/asf/ignite/blob/717dab25/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java index 1e710ee..b39be56 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java @@ -84,19 +84,27 @@ public class TcpDiscoveryMulticastIpFinderSelfTest ipFinder3.setLocalAddress(locAddr); ipFinder1.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host1", 1001))); - ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002))); - ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003))); Collection<InetSocketAddress> addrs1 = ipFinder1.getRegisteredAddresses(); + + ipFinder2.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host2", 1002))); + Collection<InetSocketAddress> addrs2 = ipFinder2.getRegisteredAddresses(); + + ipFinder3.initializeLocalAddresses(Collections.singleton(new InetSocketAddress("host3", 1003))); + Collection<InetSocketAddress> addrs3 = ipFinder3.getRegisteredAddresses(); info("Addrs1: " + addrs1); info("Addrs2: " + addrs2); info("Addrs2: " + addrs3); - assertEquals(1, ipFinder1.getRegisteredAddresses().size()); - assertEquals(2, ipFinder2.getRegisteredAddresses().size()); + assertEquals(1, addrs1.size()); + assertEquals(2, addrs2.size()); + assertEquals(3, addrs3.size()); + + assertEquals(3, ipFinder1.getRegisteredAddresses().size()); + assertEquals(3, ipFinder2.getRegisteredAddresses().size()); assertEquals(3, ipFinder3.getRegisteredAddresses().size()); } finally {
