ignite-6519 Race in SplitAwareTopologyValidator on activator and server node join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d90b8fe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d90b8fe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d90b8fe Branch: refs/heads/ignite-5937 Commit: 5d90b8feb5eb65ce190ca4106d31f386c7be42a3 Parents: 01daee6 Author: Alexandr Kuramshin <[email protected]> Authored: Mon Oct 23 15:28:28 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 23 15:28:28 2017 +0300 ---------------------------------------------------------------------- .../internal/TestRecordingCommunicationSpi.java | 12 + ...niteTopologyValidatorGridSplitCacheTest.java | 358 +++++++++++++++---- .../IgniteCacheTopologySplitAbstractTest.java | 266 ++++++++++++++ 3 files changed, 564 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index ab61687..cf4f059 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -71,6 +72,12 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + // All ignite code expects that 'send' fails after discovery listener for node fail finished. + if (getSpiContext().node(node.id()) == null) { + throw new IgniteSpiException(new ClusterTopologyCheckedException("Failed to send message" + + " (node left topology): " + node)); + } + if (msg instanceof GridIoMessage) { GridIoMessage ioMsg = (GridIoMessage)msg; @@ -115,6 +122,11 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { super.sendMessage(node, msg, ackC); } + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + sendMessage(node, msg, null); + } + /** * @param recordP Record predicate. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java index 1f3b875..1885e9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorGridSplitCacheTest.java @@ -17,32 +17,43 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.TopologyValidator; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.CacheNameResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT; /** * Tests complex scenario with topology validator. Grid is split between to data centers, defined by attribute {@link * #DC_NODE_ATTR}. If only nodes from single DC are left in topology, grid is moved into inoperative state until special * activator node'll enter a topology, enabling grid operations. */ -public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstractTest { +public class IgniteTopologyValidatorGridSplitCacheTest extends IgniteCacheTopologySplitAbstractTest { + /** */ private static final String DC_NODE_ATTR = "dc"; @@ -50,10 +61,10 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac private static final String ACTIVATOR_NODE_ATTR = "split.resolved"; /** */ - private static final int GRID_CNT = 8; + private static final int GRID_CNT = 32; /** */ - private static final int CACHES_CNT = 100; + private static final int CACHES_CNT = 50; /** */ private static final int RESOLVER_GRID_IDX = GRID_CNT; @@ -62,7 +73,62 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac private static final int CONFIGLESS_GRID_IDX = GRID_CNT + 1; /** */ - private boolean useCacheGrp = false; + private static final String STATIC_IP = "127.0.0.1"; + + /** */ + private static final Collection<String> SEG_FINDER_0; + + /** */ + private static final Collection<String> SEG_FINDER_1; + + /** */ + private static final Collection<String> SEG_FINDER_ALL; + + static { + Collection<String> seg0 = new ArrayList<>(); + + Collection<String> seg1 = new ArrayList<>(); + + for (int i = 0; i < GRID_CNT; i += 2) { + seg0.add(STATIC_IP + ':' + (DFLT_PORT + i)); + + seg1.add(STATIC_IP + ':' + (DFLT_PORT + i + 1)); + } + SEG_FINDER_0 = Collections.unmodifiableCollection(seg0); + + SEG_FINDER_1 = Collections.unmodifiableCollection(seg1); + + SEG_FINDER_ALL = F.concat(false, SEG_FINDER_0, SEG_FINDER_1); + } + + /** */ + private boolean useCacheGrp; + + /** */ + private int getDiscoPort(int gridIdx) { + return DFLT_PORT + gridIdx; + } + + /** */ + private boolean isDiscoPort(int port) { + return port >= DFLT_PORT && + port <= (DFLT_PORT + TcpDiscoverySpi.DFLT_PORT_RANGE); + } + + /** {@inheritDoc} */ + @Override protected boolean isBlocked(int locPort, int rmtPort) { + return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort); + } + + /** */ + private int segment(int discoPort) { + return (discoPort - DFLT_PORT) % 2; + } + + /** {@inheritDoc} */ + @Override protected int segment(ClusterNode node) { + return node.attribute(DC_NODE_ATTR); + } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -70,17 +136,32 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac int idx = getTestIgniteInstanceIndex(gridName); - cfg.setUserAttributes(F.asMap(DC_NODE_ATTR, idx % 2)); + Map<String, Object> userAttrs = new HashMap<>(4); + + int segment = idx % 2; + + userAttrs.put(DC_NODE_ATTR, segment); + + TcpDiscoverySpi disco = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + disco.setLocalPort(getDiscoPort(idx)); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(segmented() ? + (segment == 0 ? SEG_FINDER_0 : SEG_FINDER_1) : SEG_FINDER_ALL)); if (idx != CONFIGLESS_GRID_IDX) { if (idx == RESOLVER_GRID_IDX) { cfg.setClientMode(true); - cfg.setUserAttributes(F.asMap(ACTIVATOR_NODE_ATTR, "true")); + userAttrs.put(ACTIVATOR_NODE_ATTR, "true"); } else cfg.setActiveOnStart(false); } + cfg.setUserAttributes(userAttrs); + + cfg.setMemoryConfiguration(new MemoryConfiguration(). + setDefaultMemoryPolicySize((50L << 20) + (100L << 20) * CACHES_CNT / GRID_CNT)); return cfg; } @@ -129,6 +210,12 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac stopAllGrids(); } + /** */ + protected void stopGrids(int... grids) { + for (int idx : grids) + stopGrid(idx); + } + /** * Tests topology split scenario. * @@ -149,8 +236,8 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac /** * Tests topology split scenario. - * @param useCacheGrp Use cache group. * + * @param useCacheGrp Use cache group. * @throws Exception If failed. */ private void testTopologyValidator0(boolean useCacheGrp) throws Exception { @@ -161,31 +248,26 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac grid.getOrCreateCaches(getCacheConfigurations()); // Init grid index arrays - int[] dc1 = new int[GRID_CNT / 2]; + int[] seg1 = new int[GRID_CNT / 2]; - for (int i = 0; i < dc1.length; ++i) - dc1[i] = i * 2 + 1; + for (int i = 0; i < seg1.length; ++i) + seg1[i] = i * 2 + 1; - int[] dc0 = new int[GRID_CNT - dc1.length]; + int[] seg0 = new int[GRID_CNT - seg1.length]; - for (int i = 0; i < dc0.length; ++i) - dc0[i] = i * 2; + for (int i = 0; i < seg0.length; ++i) + seg0[i] = i * 2; // Tests what each node is able to do puts. - tryPut(dc0); - - tryPut(dc1); + tryPut(seg0, seg1); clearAll(); // Force segmentation. - for (int idx : dc1) - stopGrid(idx); - - awaitPartitionMapExchange(); + splitAndWait(); try { - tryPut(dc0); + tryPut(seg0, seg1); fail(); } @@ -196,24 +278,41 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac // Repair split by adding activator node in topology. resolveSplit(); - tryPut(dc0); + tryPut(seg0); clearAll(); + try { + tryPut(seg1); + + fail(); + } + catch (Exception e) { + // No-op. + } + + stopGrids(seg1); + // Fix split by adding node from second DC. + unsplit(); + startGrid(CONFIGLESS_GRID_IDX); awaitPartitionMapExchange(); + tryPut(seg0); + tryPut(CONFIGLESS_GRID_IDX); + clearAll(); + // Force split by removing last node from second DC. stopGrid(CONFIGLESS_GRID_IDX); awaitPartitionMapExchange(); try { - tryPut(dc0); + tryPut(seg0); fail(); } @@ -221,10 +320,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac // No-op. } + // Repair split with concurrent server node join race. + resolveSplitWithRace(CONFIGLESS_GRID_IDX); + // Repair split by adding activator node in topology. resolveSplit(); - tryPut(dc0); + tryPut(seg0); clearAll(); @@ -233,9 +335,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac awaitPartitionMapExchange(); - for (int i = 0; i < dc0.length; i++) { - int idx = dc0[i]; - + for (int idx : seg0) { if (idx == 0) continue; @@ -249,7 +349,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac awaitPartitionMapExchange(); - assertEquals("Expecting put count", CACHES_CNT * dc0.length, tryPut(dc0)); + assertEquals("Expecting put count", CACHES_CNT * seg0.length, tryPut(seg0)); } /** @@ -277,39 +377,132 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac } /** - * @param grids Grids to test. + * Resolves split by client node join with server node join race simulation. + * + * @param srvNode server node index to simulate join race + * @throws Exception If failed. */ - private int tryPut(int... grids) { + private void resolveSplitWithRace(int srvNode) throws Exception { + startGrid(RESOLVER_GRID_IDX); + + startGrid(srvNode); + + awaitPartitionMapExchange(); + + tryPut(srvNode); + + clearAll(); + + stopGrid(srvNode); + + awaitPartitionMapExchange(); + + try { + tryPut(0); + + fail(); + } + catch (Exception e) { + // No-op. + } + + stopGrid(RESOLVER_GRID_IDX); + } + + /** + * @param idx Grid to test. + * @return number of successful puts to caches + * @throws IgniteException If all tries to put was failed. + * @throws AssertionError If some of tries to put was failed. + */ + private int tryPut(int idx) { + IgniteEx g = grid(idx); + int putCnt = 0; - for (int i = 0; i < grids.length; i++) { - IgniteEx g = grid(grids[i]); - for (int cnt = 0; cnt < CACHES_CNT; cnt++) { - String cacheName = testCacheName(cnt); + IgniteException ex = null; - for (int k = 0; k < 100; k++) { - if (g.affinity(cacheName).isPrimary(g.localNode(), k)) { - IgniteCache<Object, Object> cache = g.cache(cacheName); + for (int cnt = 0; cnt < CACHES_CNT; cnt++) { + String cacheName = testCacheName(cnt); - try { - cache.put(k, k); - } - catch (Throwable t) { - log.error("Failed to put entry: [cache=" + cacheName + ", key=" + k + ", nodeId=" + - g.name() + ']', t); + int key = -1; - throw t; - } + Affinity<Object> aff = g.affinity(cacheName); + + for (int k = 0; k < aff.partitions(); k++) { + if (aff.isPrimary(g.cluster().localNode(), k)) { + key = k; - assertEquals(1, cache.localSize()); + break; + } + } - putCnt++; + assertTrue("Failed to find affinity key [gridIdx=" + idx +", cache=" + cacheName + ']', + key != -1); - break; - } + IgniteCache<Object, Object> cache = g.cache(cacheName); + + try { + cache.put(key, key); + + assertEquals(1, cache.localSize()); + + if (ex != null) + throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx + + ", cacheName=" + cacheName + ']', ex); + + putCnt++; + } + catch (Throwable t) { + IgniteException e = new IgniteException("Failed to put entry [cache=" + cacheName + ", key=" + + key + ']', t); + + log.error(e.getMessage(), e.getCause()); + + if (ex == null) + ex = new IgniteException("Failed to put entry [node=" + g.name() + ']'); + + ex.addSuppressed(t); + } + } + if (ex != null) + throw ex; + + return putCnt; + } + + /** + * @param grids Grids to test. + * @return number of successful puts to caches + * @throws IgniteException If all tries to put was failed. + * @throws AssertionError If some of tries to put was failed. + */ + private int tryPut(int[]... grids) { + int putCnt = 0; + + IgniteException ex = null; + + for (int[] idxs : grids) { + for (int idx : idxs) { + try { + int cnt = tryPut(idx); + + if (ex != null) + throw new AssertionError("Successful tryPut after failure [gridIdx=" + idx + + ", sucessful puts = " + cnt + ']', ex); + + putCnt += cnt; + } + catch (Exception e) { + if (ex == null) + ex = new IgniteException("Failed to put entry"); + + ex.addSuppressed(e); } } } + if (ex != null) + throw ex; return putCnt; } @@ -318,20 +511,21 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac * Prevents cache from performing any operation if only nodes from single data center are left in topology. */ private static class SplitAwareTopologyValidator implements TopologyValidator { + /** */ private static final long serialVersionUID = 0L; /** */ @CacheNameResource - private String cacheName; + private transient String cacheName; /** */ @IgniteInstanceResource - private Ignite ignite; + private transient Ignite ignite; /** */ @LoggerResource - private IgniteLogger log; + private transient IgniteLogger log; /** State. */ private transient State state; @@ -340,12 +534,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac @Override public boolean validate(Collection<ClusterNode> nodes) { initIfNeeded(nodes); - if (!F.view(nodes, new IgnitePredicate<ClusterNode>() { + for (ClusterNode node : F.view(nodes, new IgnitePredicate<ClusterNode>() { @Override public boolean apply(ClusterNode node) { return !node.isClient() && node.attribute(DC_NODE_ATTR) == null; } - }).isEmpty()) { - log.error("No valid server nodes are detected in topology: [cacheName=" + cacheName + ']'); + })) { + log.error("Not valid server nodes are detected in topology: [cacheName=" + cacheName + ", node=" + + node + ']'); return false; } @@ -353,7 +548,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac boolean segmented = segmented(nodes); if (!segmented) - state = State.VALID; // Also clears possible REPAIRED state. + state = State.VALID; // Also clears possible BEFORE_REPAIRED and REPAIRED states. else { if (state == State.REPAIRED) // Any topology change in segmented grid in repaired mode is valid. return true; @@ -361,23 +556,40 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac // Find discovery event node. ClusterNode evtNode = evtNode(nodes); - if (activator(evtNode)) { - if (log.isInfoEnabled()) - log.info("Grid segmentation is repaired: [cacheName=" + cacheName + ']'); - - state = State.REPAIRED; - } + if (activator(evtNode)) + state = State.BEFORE_REPARED; else { - if (state == State.VALID) { - if (log.isInfoEnabled()) - log.info("Grid segmentation is detected: [cacheName=" + cacheName + ']'); + if (state == State.BEFORE_REPARED) { + boolean activatorLeft = true; + + // Check if activator is no longer in topology. + for (ClusterNode node : nodes) { + if (node.isClient() && activator(node)) { + activatorLeft = false; + + break; + } + } + + if (activatorLeft) { + if (log.isInfoEnabled()) + log.info("Grid segmentation is repaired: [cacheName=" + cacheName + ']'); + + state = State.REPAIRED; // Switch to REPAIRED state only when activator leaves. + } // Else stay in BEFORE_REPARED state. } + else { + if (state == State.VALID) { + if (log.isInfoEnabled()) + log.info("Grid segmentation is detected: [cacheName=" + cacheName + ']'); + } - state = State.NOTVALID; + state = State.NOTVALID; + } } } - return state != State.NOTVALID; + return state == State.VALID || state == State.REPAIRED; } /** */ @@ -418,7 +630,7 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac // Search for activator node in history on start. long topVer = evtNode(nodes).order(); - while(topVer > 0) { + while (topVer > 0) { Collection<ClusterNode> top = ignite.cluster().topology(topVer--); // Stop on reaching history limit. @@ -460,11 +672,13 @@ public class IgniteTopologyValidatorGridSplitCacheTest extends GridCommonAbstrac /** States. */ private enum State { - /** Topology valid. */ + /** Topology is valid. */ VALID, - /** Topology not valid */ + /** Topology is not valid */ NOTVALID, - /** Topology repaired (valid) */ + /** Before topology will be repaired (valid) */ + BEFORE_REPARED, + /** Topology is repaired (valid) */ REPAIRED; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d90b8fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java new file mode 100644 index 0000000..196681d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Collection; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Abstract class for tests over split in two half topology. + */ +public abstract class IgniteCacheTopologySplitAbstractTest extends GridCommonAbstractTest { + + /** Segmentation state. */ + private volatile boolean segmented; + + /** + * {@inheritDoc} + */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setFailureDetectionTimeout(3_000L); + + cfg.setDiscoverySpi(new SplitTcpDiscoverySpi()); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg; + } + + /** + * Trigger segmentation and wait for results. Should be called on stable topology. + * + * @throws InterruptedException If interrupted while waiting. + * @throws IgniteCheckedException On error. + */ + protected void splitAndWait() throws InterruptedException, IgniteCheckedException { + if (log.isInfoEnabled()) + log.info(">>> Simulating split"); + + long topVer = grid(0).cluster().topologyVersion(); + + // Trigger segmentation. + segmented = true; + + for (Ignite ignite : G.allGrids()) { + TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi) + ignite.configuration().getCommunicationSpi(); + + comm.blockMessages(new SegmentBlocker(ignite.cluster().localNode())); + } + + Collection<Ignite> seg0 = F.view(G.allGrids(), new IgnitePredicate<Ignite>() { + @Override public boolean apply(Ignite ignite) { + return segment(ignite.cluster().localNode()) == 0; + } + }); + + Collection<Ignite> seg1 = F.view(G.allGrids(), new IgnitePredicate<Ignite>() { + @Override public boolean apply(Ignite ignite) { + return segment(ignite.cluster().localNode()) == 1; + } + }); + + for (Ignite grid : seg0) + ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg1.size()).get(); + + for (Ignite grid : seg1) + ((IgniteKernal)grid).context().discovery().topologyFuture(topVer + seg0.size()).get(); + + // awaitPartitionMapExchange won't work because coordinator is wrong for second segment. + for (Ignite grid : G.allGrids()) + ((IgniteKernal)grid).context().cache().context().exchange().lastTopologyFuture().get(); + + if (log.isInfoEnabled()) + log.info(">>> Finished waiting for split"); + } + + /** + * Restore initial state + */ + protected void unsplit() { + if (log.isInfoEnabled()) + log.info(">>> Restoring from split"); + + segmented = false; + + for (Ignite ignite : G.allGrids()) { + TestRecordingCommunicationSpi comm = (TestRecordingCommunicationSpi) + ignite.configuration().getCommunicationSpi(); + + comm.stopBlock(); + } + } + + /** + * @return Segmented status. + */ + protected boolean segmented() { + return segmented; + } + + /** + * Defines split matrix. + * + * @param locPort Local port. + * @param rmtPort Rmt port. + * @return {@code true} is link is blocked. + */ + protected abstract boolean isBlocked(int locPort, int rmtPort); + + /** + * Defines instance segment: 0 or 1. + * + * @param node Node. + * @return Index of instance segment. + */ + protected abstract int segment(ClusterNode node); + + /** + * Discovery SPI which can simulate network split. + */ + protected class SplitTcpDiscoverySpi extends TcpDiscoverySpi { + /** + * @param sockAddr Remote socket address. + * @return Segmented status. + */ + protected boolean segmented(InetSocketAddress sockAddr) { + if (!segmented) + return false; + + int rmtPort = sockAddr.getPort(); + + boolean b = isBlocked(getLocalPort(), rmtPort); + + if (b && log.isDebugEnabled()) + log.debug("Block cross-segment communication [locPort=" + getLocalPort() + ", rmtPort=" + rmtPort + ']'); + + return b; + } + + /** + * @param sockAddr Socket address. + * @param timeout Socket timeout. + * @throws SocketTimeoutException If segmented. + */ + protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws SocketTimeoutException { + if (segmented(sockAddr)) { + if (timeout > 0) { + try { + Thread.sleep(timeout); + } + catch (InterruptedException e) { + // No-op. + } + } + + throw new SocketTimeoutException("Fake socket timeout."); + } + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { + checkSegmented(remAddr, timeoutHelper.nextTimeoutChunk(getSocketTimeout())); + + return super.openSocket(sock, remAddr, timeoutHelper); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket( + Socket sock, + TcpDiscoveryAbstractMessage msg, + byte[] data, + long timeout + ) throws IOException { + checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket( + Socket sock, + TcpDiscoveryAbstractMessage msg, + long timeout + ) throws IOException, IgniteCheckedException { + checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout); + + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout); + + super.writeToSocket(msg, sock, res, timeout); + } + } + + /** */ + protected class SegmentBlocker implements IgniteBiPredicate<ClusterNode, Message> { + /** */ + private final ClusterNode locNode; + + /** + * @param locNode Local node. + */ + SegmentBlocker(ClusterNode locNode) { + assert locNode != null; + + this.locNode = locNode; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node, Message message) { + return segment(locNode) != segment(node); + } + } +} \ No newline at end of file
