This is an automated email from the ASF dual-hosted git repository. dgovorukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 50dd28d IGNITE-9858 Fix SystemCacheNotConfiguredTest#test flaky fails - Fixes #4977. 50dd28d is described below commit 50dd28d28359930e020cf53d3000066965fc56b7 Author: pereslegin-pa <xxt...@gmail.com> AuthorDate: Tue Dec 25 14:01:50 2018 +0300 IGNITE-9858 Fix SystemCacheNotConfiguredTest#test flaky fails - Fixes #4977. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> --- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java | 34 +--- .../multicast/TcpDiscoveryMulticastIpFinder.java | 213 +++++++++++---------- .../sharedfs/TcpDiscoverySharedFsIpFinder.java | 7 + .../tcp/TcpClientDiscoverySpiMulticastTest.java | 8 +- 5 files changed, 122 insertions(+), 142 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 7cb153e..56b1a06 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -294,7 +294,7 @@ class ClientImpl extends TcpDiscoveryImpl { sockReader = new SocketReader(); sockReader.start(); - if (spi.ipFinder.isShared()) + if (spi.ipFinder.isShared() && spi.isForceServerMode()) registerLocalNodeAddress(); msgWorker = new MessageWorker(log); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java index 1cd91f6..c491dc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java @@ -19,15 +19,11 @@ package org.apache.ignite.spi.discovery.tcp.ipfinder; import java.net.InetSocketAddress; import java.util.Collection; -import org.apache.ignite.Ignite; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; /** * IP finder interface implementation adapter. @@ -40,11 +36,6 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde @GridToStringExclude private volatile IgniteSpiContext spiCtx; - /** Ignite instance . */ - @IgniteInstanceResource - @GridToStringExclude - protected Ignite ignite; - /** {@inheritDoc} */ @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { this.spiCtx = spiCtx; @@ -57,8 +48,7 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde /** {@inheritDoc} */ @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - if (!discoveryClientMode()) - registerAddresses(addrs); + registerAddresses(addrs); } /** {@inheritDoc} */ @@ -91,28 +81,6 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde } /** - * @return {@code True} if TCP discovery works in client mode. - */ - protected boolean discoveryClientMode() { - boolean clientMode; - - Ignite ignite0 = ignite; - - if (ignite0 != null) { // Can be null if used in tests without starting Ignite. - DiscoverySpi discoSpi = ignite0.configuration().getDiscoverySpi(); - - if (!(discoSpi instanceof TcpDiscoverySpi)) - throw new IgniteSpiException("TcpDiscoveryIpFinder should be used with TcpDiscoverySpi: " + discoSpi); - - clientMode = ignite0.configuration().isClientMode() && !((TcpDiscoverySpi)discoSpi).isForceServerMode(); - } - else - clientMode = false; - - return clientMode; - } - - /** * @return SPI context. */ protected IgniteSpiContext spiContext() { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index 0473ab2..246992b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -126,7 +127,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { @GridToStringExclude private InetAddress mcastAddr; - /** */ + /** Interfaces used to send requests. */ @GridToStringExclude private Set<InetAddress> reqItfs; @@ -308,132 +309,66 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** {@inheritDoc} */ @Override public void initializeLocalAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { - // If IGNITE_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from - // configuration. Used for testing purposes. - String overrideMcastGrp = System.getProperty(IGNITE_OVERRIDE_MCAST_GRP); - - if (overrideMcastGrp != null) - mcastGrp = overrideMcastGrp; - - if (F.isEmpty(mcastGrp)) - throw new IgniteSpiException("Multicast IP address is not specified."); - - if (mcastPort < 0 || mcastPort > 65535) - throw new IgniteSpiException("Invalid multicast port: " + mcastPort); - - if (resWaitTime <= 0) - throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); - - if (addrReqAttempts <= 0) - throw new IgniteSpiException("Invalid number of address request attempts, " + - "value greater than zero is expected: " + addrReqAttempts); - - if (ttl != -1 && (ttl < 0 || ttl > 255)) - throw new IgniteSpiException("Time-to-live value is out of 0 <= TTL <= 255 range: " + ttl); - - if (F.isEmpty(getRegisteredAddresses())) + if (F.isEmpty(super.getRegisteredAddresses())) U.warn(log, "TcpDiscoveryMulticastIpFinder has no pre-configured addresses " + "(it is recommended in production to specify at least one address in " + "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); - boolean clientMode = discoveryClientMode(); + Collection<InetAddress> locAddrs = resolveLocalAddresses(); - try { - mcastAddr = InetAddress.getByName(mcastGrp); - } - catch (UnknownHostException e) { - throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); - } + addrSnds = new ArrayList<>(locAddrs.size()); - if (!mcastAddr.isMulticastAddress()) - throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); + reqItfs = new HashSet<>(U.capacity(locAddrs.size())); // Interfaces used to send requests. - Collection<String> locAddrs; + for (InetAddress addr : locAddrs) { + try { + addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); - try { - locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); - } - catch (IOException | IgniteCheckedException e) { - throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); + reqItfs.add(addr); + } + catch (IOException e) { + if (log.isDebugEnabled()) + log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + + ", err=" + e + ']'); + } } - assert locAddrs != null; - - addrSnds = new ArrayList<>(locAddrs.size()); - - reqItfs = new HashSet<>(locAddrs.size()); // Interfaces used to send requests. - - for (String locAddr : locAddrs) { - InetAddress addr; + locNodeAddrs = new HashSet<>(addrs); + if (addrSnds.isEmpty()) { try { - addr = InetAddress.getByName(locAddr); + // Create non-bound socket if local host is loopback or failed to create sockets explicitly + // bound to interfaces. + addrSnds.add(new AddressSender(mcastAddr, null, addrs)); } - catch (UnknownHostException e) { + catch (IOException e) { if (log.isDebugEnabled()) - log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); - - continue; + log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", err=" + e + ']'); } - if (!addr.isLoopbackAddress()) { + if (addrSnds.isEmpty()) { try { - if (!clientMode) - addrSnds.add(new AddressSender(mcastAddr, addr, addrs)); + addrSnds.add(new AddressSender(mcastAddr, mcastAddr, addrs)); - reqItfs.add(addr); + reqItfs.add(mcastAddr); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + addr + + ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + mcastAddr + ", err=" + e + ']'); } } } - if (!clientMode) { - locNodeAddrs = new HashSet<>(addrs); - - if (addrSnds.isEmpty()) { - try { - // Create non-bound socket if local host is loopback or failed to create sockets explicitly - // bound to interfaces. - addrSnds.add(new AddressSender(mcastAddr, null, addrs)); - } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", err=" + e + ']'); - } - - if (addrSnds.isEmpty()) { - try { - addrSnds.add(new AddressSender(mcastAddr, mcastAddr, addrs)); - - reqItfs.add(mcastAddr); - } - catch (IOException e) { - if (log.isDebugEnabled()) - log.debug("Failed to create multicast socket [mcastAddr=" + mcastAddr + - ", mcastGrp=" + mcastGrp + ", mcastPort=" + mcastPort + ", locAddr=" + mcastAddr + - ", err=" + e + ']'); - } - } - } - - if (!addrSnds.isEmpty()) { - for (AddressSender addrSnd : addrSnds) - addrSnd.start(); - } - else - mcastErr = true; - } - else { - assert addrSnds.isEmpty() : addrSnds; - - locNodeAddrs = Collections.emptySet(); + if (!addrSnds.isEmpty()) { + for (AddressSender addrSnd : addrSnds) + addrSnd.start(); } + else + mcastErr = true; } /** {@inheritDoc} */ @@ -445,6 +380,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { /** {@inheritDoc} */ @Override public synchronized Collection<InetSocketAddress> getRegisteredAddresses() { + if (mcastAddr == null) + reqItfs = new HashSet<>(resolveLocalAddresses()); + if (mcastAddr != null && reqItfs != null) { Collection<InetSocketAddress> ret; @@ -460,7 +398,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { if (ret.isEmpty()) { if (mcastErr && firstReq) { - if (getRegisteredAddresses().isEmpty()) { + if (super.getRegisteredAddresses().isEmpty()) { InetSocketAddress addr = new InetSocketAddress("localhost", TcpDiscoverySpi.DFLT_PORT); U.quietAndWarn(log, "TcpDiscoveryMulticastIpFinder failed to initialize multicast, " + @@ -482,6 +420,77 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { return super.getRegisteredAddresses(); } + /** + * Resolve local addresses. + * + * @return List of non-loopback addresses. + */ + private Collection<InetAddress> resolveLocalAddresses() { + // If IGNITE_OVERRIDE_MCAST_GRP system property is set, use its value to override multicast group from + // configuration. Used for testing purposes. + String overrideMcastGrp = System.getProperty(IGNITE_OVERRIDE_MCAST_GRP); + + if (overrideMcastGrp != null) + mcastGrp = overrideMcastGrp; + + if (F.isEmpty(mcastGrp)) + throw new IgniteSpiException("Multicast IP address is not specified."); + + if (mcastPort < 0 || mcastPort > 65535) + throw new IgniteSpiException("Invalid multicast port: " + mcastPort); + + if (resWaitTime <= 0) + throw new IgniteSpiException("Invalid wait time, value greater than zero is expected: " + resWaitTime); + + if (addrReqAttempts <= 0) + throw new IgniteSpiException("Invalid number of address request attempts, " + + "value greater than zero is expected: " + addrReqAttempts); + + if (ttl != -1 && (ttl < 0 || ttl > 255)) + throw new IgniteSpiException("Time-to-live value is out of 0 <= TTL <= 255 range: " + ttl); + + try { + mcastAddr = InetAddress.getByName(mcastGrp); + } + catch (UnknownHostException e) { + throw new IgniteSpiException("Unknown multicast group: " + mcastGrp, e); + } + + if (!mcastAddr.isMulticastAddress()) + throw new IgniteSpiException("Invalid multicast group address: " + mcastAddr); + + Collection<String> locAddrs; + + try { + locAddrs = U.resolveLocalAddresses(U.resolveLocalHost(locAddr)).get1(); + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteSpiException("Failed to resolve local addresses [locAddr=" + locAddr + ']', e); + } + + assert locAddrs != null; + + List<InetAddress> inetAddrs = new ArrayList<>(locAddrs.size()); + + for (String locAddr : locAddrs) { + InetAddress addr; + + try { + addr = InetAddress.getByName(locAddr); + } + catch (UnknownHostException e) { + if (log.isDebugEnabled()) + log.debug("Failed to resolve local address [locAddr=" + locAddr + ", err=" + e + ']'); + + continue; + } + + if (!addr.isLoopbackAddress()) + inetAddrs.add(addr); + } + + return inetAddrs; + } /** * @param reqItfs Interfaces used to send requests. @@ -770,7 +779,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * @param sockAddr Optional address multicast socket should be bound to. */ private AddressReceiver(InetAddress mcastAddr, InetAddress sockAddr) { - super(ignite == null ? null : ignite.name(), "tcp-disco-multicast-addr-rcvr", log); + super(null, "tcp-disco-multicast-addr-rcvr", log); this.mcastAddr = mcastAddr; this.sockAddr = sockAddr; } @@ -813,7 +822,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { */ private AddressSender(InetAddress mcastGrp, @Nullable InetAddress sockItf, Collection<InetSocketAddress> addrs) throws IOException { - super(ignite == null ? null : ignite.name(), "tcp-disco-multicast-addr-sender", log); + super(null, "tcp-disco-multicast-addr-sender", log); this.mcastGrp = mcastGrp; this.addrs = addrs; this.sockItf = sockItf; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java index 397af1a..63e1080 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiException; @@ -69,6 +71,11 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { /** IPv6 colon substitute. */ private static final String COLON_SUBST = "_"; + /** Ignite instance . */ + @IgniteInstanceResource + @GridToStringExclude + private Ignite ignite; + /** Grid logger. */ @LoggerResource private IgniteLogger log; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java index ec44dee..a73cd8b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.DiscoverySpi; @@ -218,11 +219,6 @@ public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { Collection<Object> addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds"); - assertNotNull(addrSnds); - - if (client) - assertTrue(addrSnds.isEmpty()); // Check client does not send its address. - else - assertFalse(addrSnds.isEmpty()); + assertEquals(client, F.isEmpty(addrSnds)); } }