Repository: ignite Updated Branches: refs/heads/ignite-zk 8790099eb -> 45bd0a287
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/45bd0a28 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/45bd0a28 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/45bd0a28 Branch: refs/heads/ignite-zk Commit: 45bd0a2876ba943219149302a921fa21279e1737 Parents: 8790099 Author: sboikov <[email protected]> Authored: Mon Dec 4 16:31:56 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 4 16:53:39 2017 +0300 ---------------------------------------------------------------------- .../IgniteDiscoverySpiInternalListener.java | 10 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 7 + .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 34 +++++ .../zk/internal/ZookeeperClusterNode.java | 25 ++- .../internal/DiscoverySpiBlockJoinListener.java | 62 -------- .../internal/DiscoverySpiTestListener.java | 152 +++++++++++++++++++ .../IgniteClientReconnectApiExceptionTest.java | 2 +- .../IgniteClientReconnectCacheTest.java | 4 +- .../CacheLateAffinityAssignmentTest.java | 128 ++++------------ .../ZookeeperDiscoverySpiBasicTest.java | 34 ++++- 10 files changed, 282 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java index b655681..eab35ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.managers.discovery; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** * @@ -26,5 +28,11 @@ public interface IgniteDiscoverySpiInternalListener { /** * @param log Log. */ - void beforeJoin(IgniteLogger log); + public void beforeJoin(IgniteLogger log); + + /** + * @param log Logger. + * @param msg Custom message. + */ + public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 404868e..52b229f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -472,6 +472,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; + + if (internalLsnr != null) { + if (!internalLsnr.beforeSendCustomEvent(this, log, msg)) + return; + } + impl.sendCustomEvent(msg); } http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index 8fa8f96..8a9693b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -17,7 +17,9 @@ package org.apache.ignite.spi.discovery.zk; +import java.io.IOException; import java.io.Serializable; +import java.net.InetAddress; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -28,6 +30,8 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalL import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; @@ -253,6 +257,13 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { + IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr; + + if (internalLsnr != null) { + if (!internalLsnr.beforeSendCustomEvent(this, log, msg)) + return; + } + impl.sendCustomMessage(msg); } @@ -347,10 +358,33 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private ZookeeperClusterNode initLocalNode() { assert ignite != null; + String locHost = ignite.configuration().getLocalHost(); + + InetAddress locAddr; + + try { + locAddr = U.resolveLocalHost(locHost); + } + catch (IOException e) { + throw new IgniteSpiException("Unknown local address: " + locHost, e); + } + + IgniteBiTuple<Collection<String>, Collection<String>> addrs; + + try { + addrs = U.resolveLocalAddresses(locAddr); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to resolve local host to set of external addresses: " + locHost, e); + } + + consistentId = consistentId(); ZookeeperClusterNode locNode = new ZookeeperClusterNode( ignite.configuration().getNodeId(), + addrs.get1(), + addrs.get2(), locNodeVer, locNodeAttrs, consistentId, http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java index c2d15cf..a62ee03 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -64,11 +63,15 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co private IgniteProductVersion ver; /** Node attributes. */ - @GridToStringExclude private Map<String, Object> attrs; + /** Internal discovery addresses as strings. */ + private Collection<String> addrs; + + /** Internal discovery host names as strings. */ + private Collection<String> hostNames; + /** Metrics provider. */ - @GridToStringExclude private transient DiscoveryMetricsProvider metricsProvider; /** */ @@ -101,6 +104,8 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co */ public ZookeeperClusterNode( UUID id, + Collection<String> addrs, + Collection<String> hostNames, IgniteProductVersion ver, Map<String, Object> attrs, Serializable consistentId, @@ -112,7 +117,9 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co this.id = id; this.ver = ver; - this.attrs = U.sealMap(attrs); + this.attrs = Collections.unmodifiableMap(attrs); + this.addrs = addrs; + this.hostNames = hostNames; this.consistentId = consistentId; this.metricsProvider = metricsProvider; @@ -207,12 +214,12 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co /** {@inheritDoc} */ @Override public Collection<String> addresses() { - return Collections.emptyList(); + return addrs; } /** {@inheritDoc} */ @Override public Collection<String> hostNames() { - return Collections.emptyList(); + return hostNames; } /** {@inheritDoc} */ @@ -311,6 +318,10 @@ public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, Co /** {@inheritDoc} */ @Override public String toString() { - return "ZookeeperClusterNode [id=" + id + ", order=" + order + ", client=" + isClient() + ']'; + return "ZookeeperClusterNode [id=" + id + + ", addrs=" + addrs + + ", order=" + order + + ", loc=" + loc + + ", client=" + isClient() + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java deleted file mode 100644 index f895fe2..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiBlockJoinListener.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import java.util.concurrent.CountDownLatch; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * - */ -public class DiscoverySpiBlockJoinListener implements IgniteDiscoverySpiInternalListener { - /** */ - private volatile CountDownLatch writeLatch; - - /** - * - */ - public void startBlock() { - writeLatch = new CountDownLatch(1); - } - - /** - * - */ - public void stopBlock() { - writeLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void beforeJoin(IgniteLogger log) { - try { - CountDownLatch writeLatch0 = writeLatch; - - if (writeLatch0 != null) { - log.info("Block join"); - - U.await(writeLatch0); - } - } - catch (Exception e) { - throw new IgniteException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java new file mode 100644 index 0000000..3e32f52 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * + */ +public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListener { + /** */ + private volatile CountDownLatch joinLatch; + + /** */ + private boolean blockCustomEvt; + + /** */ + private final Object mux = new Object(); + + /** */ + private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); + + /** */ + private volatile DiscoverySpi spi; + + /** */ + private volatile IgniteLogger log; + + /** + * + */ + public void startBlock() { + joinLatch = new CountDownLatch(1); + } + + /** + * + */ + public void stopBlock() { + joinLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void beforeJoin(IgniteLogger log) { + try { + CountDownLatch writeLatch0 = joinLatch; + + if (writeLatch0 != null) { + log.info("Block join"); + + U.await(writeLatch0); + } + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + this.spi = spi; + this.log = log; + + synchronized (mux) { + if (blockCustomEvt) { + DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + + if (msg0 instanceof CacheAffinityChangeMessage) { + log.info("Block custom message: " + msg0); + + blockedMsgs.add(msg); + + mux.notifyAll(); + + return false; + } + } + } + + return true; + } + /** + * + */ + public void blockCustomEvent() { + synchronized (mux) { + assert blockedMsgs.isEmpty() : blockedMsgs; + + blockCustomEvt = true; + } + } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitCustomEvent() throws InterruptedException { + synchronized (mux) { + while (blockedMsgs.isEmpty()) + mux.wait(); + } + } + + /** + * + */ + public void stopBlockCustomEvents() { + if (spi == null) + return; + + List<DiscoverySpiCustomMessage> msgs; + + synchronized (this) { + msgs = new ArrayList<>(blockedMsgs); + + blockCustomEvt = false; + + blockedMsgs.clear(); + } + + for (DiscoverySpiCustomMessage msg : msgs) { + log.info("Resend blocked message: " + msg); + + spi.sendCustomEvent(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java index 4dacaba..310f58b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java @@ -782,7 +782,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect log.info("Block reconnect."); - DiscoverySpiBlockJoinListener lsnr = new DiscoverySpiBlockJoinListener(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); clientSpi.setInternalListener(lsnr); lsnr.startBlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 3f4109f..8aad001 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -190,7 +190,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Block reconnect."); - DiscoverySpiBlockJoinListener lsnr = new DiscoverySpiBlockJoinListener(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); clientSpi.setInternalListener(lsnr); @@ -428,7 +428,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Block reconnect."); - DiscoverySpiBlockJoinListener lsnr = new DiscoverySpiBlockJoinListener(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); clientSpi.setInternalListener(lsnr); http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index ab07611..8853db8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteInternalFuture; @@ -59,10 +60,9 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; -import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -87,7 +87,6 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -157,7 +156,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { cfg.setCommunicationSpi(commSpi); - TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setForceServerMode(forceSrvMode); discoSpi.setIpFinder(ipFinder); @@ -673,9 +672,11 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { checkAffinity(4, topVer(4, 0), true); - TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); - discoSpi.blockCustomEvent(); + ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); + + lsnr.blockCustomEvent(); stopGrid(1); @@ -686,7 +687,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { for (IgniteInternalFuture<?> fut : futs) assertFalse(fut.isDone()); - discoSpi.stopBlock(); + lsnr.stopBlockCustomEvents(); checkAffinity(3, topVer(5, 0), false); @@ -1408,8 +1409,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { public void testDelayAssignmentAffinityChanged() throws Exception { Ignite ignite0 = startServer(0, 1); - TestTcpDiscoverySpi discoSpi0 = - (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); + TestRecordingCommunicationSpi commSpi0 = (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); @@ -1417,19 +1420,19 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { checkAffinity(2, topVer(2, 0), true); - discoSpi0.blockCustomEvent(); + lsnr.blockCustomEvent(); startServer(2, 3); checkAffinity(3, topVer(3, 0), false); - discoSpi0.waitCustomEvent(); + lsnr.waitCustomEvent(); blockSupplySend(commSpi0, CACHE_NAME1); startServer(3, 4); - discoSpi0.stopBlock(); + lsnr.stopBlockCustomEvents(); checkAffinity(4, topVer(4, 0), false); @@ -1451,8 +1454,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { try { Ignite ignite0 = startServer(0, 1); - TestTcpDiscoverySpi discoSpi0 = - (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); + TestRecordingCommunicationSpi commSpi0 = (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); @@ -1464,11 +1469,11 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { checkAffinity(3, topVer(3, 1), false); - discoSpi0.blockCustomEvent(); + lsnr.blockCustomEvent(); stopNode(2, 4); - discoSpi0.waitCustomEvent(); + lsnr.waitCustomEvent(); blockSupplySend(commSpi0, CACHE_NAME1); @@ -1482,7 +1487,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { Thread.sleep(2_000); - discoSpi0.stopBlock(); + lsnr.stopBlockCustomEvents(); boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -1533,14 +1538,16 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { ignite0.createCache(ccfg); - TestTcpDiscoverySpi discoSpi0 = - (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + ((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr); + TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi(); blockSupplySend(spi, CACHE_NAME2); - discoSpi0.blockCustomEvent(); + lsnr.blockCustomEvent(); startServer(1, 2); @@ -1550,7 +1557,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { spi.stopBlock(); - discoSpi0.waitCustomEvent(); + lsnr.waitCustomEvent(); ignite0.destroyCache(CACHE_NAME2); @@ -1560,7 +1567,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { ignite0.createCache(ccfg); - discoSpi0.stopBlock(); + lsnr.stopBlockCustomEvents(); checkAffinity(3, topVer(3, 1), false); checkAffinity(3, topVer(3, 2), false); @@ -2966,83 +2973,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { /** * */ - static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private boolean blockCustomEvt; - - /** */ - private final Object mux = new Object(); - - /** */ - private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - synchronized (mux) { - if (blockCustomEvt) { - DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); - - if (msg0 instanceof CacheAffinityChangeMessage) { - log.info("Block custom message: " + msg0); - - blockedMsgs.add(msg); - - mux.notifyAll(); - - return; - } - } - } - - super.sendCustomEvent(msg); - } - - /** - * - */ - public void blockCustomEvent() { - synchronized (mux) { - assert blockedMsgs.isEmpty() : blockedMsgs; - - blockCustomEvt = true; - } - } - - /** - * @throws InterruptedException If interrupted. - */ - public void waitCustomEvent() throws InterruptedException { - synchronized (mux) { - while (blockedMsgs.isEmpty()) - mux.wait(); - } - } - - /** - * - */ - public void stopBlock() { - List<DiscoverySpiCustomMessage> msgs; - - synchronized (this) { - msgs = new ArrayList<>(blockedMsgs); - - blockCustomEvt = false; - - blockedMsgs.clear(); - } - - for (DiscoverySpiCustomMessage msg : msgs) { - log.info("Resend blocked message: " + msg); - - super.sendCustomEvent(msg); - } - } - } - - /** - * - */ static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> { /** */ private Object val; http://git-wip-us.apache.org/repos/asf/ignite/blob/45bd0a28/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index f88bb77..c95bdf7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -41,12 +41,13 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.DiscoverySpiBlockJoinListener; +import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; @@ -324,6 +325,31 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testAddresses() throws Exception { + startGridsMultiThreaded(3); + + client = true; + + startGridsMultiThreaded(3, 3); + + waitForTopology(6); + + for (Ignite node : G.allGrids()) { + ClusterNode locNode0 = node.cluster().localNode(); + + assertTrue(locNode0.addresses().size() > 0); + assertTrue(locNode0.hostNames().size() > 0); + + for (ClusterNode node0 : node.cluster().nodes()) { + assertTrue(node0.addresses().size() > 0); + assertTrue(node0.hostNames().size() > 0); + } + } + } + + /** + * @throws Exception If failed. + */ public void testClientNodesStatus() throws Exception { startGrid(0); @@ -1750,7 +1776,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { List<String> zkNodes = new ArrayList<>(); - List<DiscoverySpiBlockJoinListener> lsnrs = new ArrayList<>(); + List<DiscoverySpiTestListener> lsnrs = new ArrayList<>(); for (Ignite client : clients) { client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); @@ -1758,7 +1784,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { zkNodes.add(aliveZkNodePath(client)); if (disconnectedC != null) { - DiscoverySpiBlockJoinListener lsnr = new DiscoverySpiBlockJoinListener(); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); ((IgniteDiscoverySpi)client.configuration().getDiscoverySpi()).setInternalListener(lsnr); @@ -1822,7 +1848,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { if (disconnectedC != null) { disconnectedC.run(); - for (DiscoverySpiBlockJoinListener lsnr : lsnrs) + for (DiscoverySpiTestListener lsnr : lsnrs) lsnr.stopBlock(); }
