http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java index 482e2ef..c7a1a53 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.communication.tcp; +import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.mxbean.MXBeanDescription; import org.apache.ignite.spi.IgniteSpiManagementMBean; @@ -44,6 +45,35 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getLocalPort(); /** + * Returns {@code true} if {@code TcpCommunicationSpi} should + * maintain connection for outgoing and incoming messages separately. + * In this case total number of connections between local and some remote node + * is {@link #getConnectionsPerNode()} * 2. + * <p> + * Returns {@code false} if each connection of {@link #getConnectionsPerNode()} + * should be used for outgoing and incoming messages. In this case load NIO selectors load + * balancing of {@link GridNioServer} will be disabled. + * <p> + * Default is {@code true}. + * + * @return {@code true} to use paired connections and {@code false} otherwise. + * @see #getConnectionsPerNode() + */ + @MXBeanDescription("Paired connections used.") + public boolean isUsePairedConnections(); + + /** + * Gets number of connections to each remote node. if {@link #isUsePairedConnections()} + * is {@code true} then number of connections is doubled and half is used for incoming and + * half for outgoing messages. + * + * @return Number of connections per node. + * @see #isUsePairedConnections() + */ + @MXBeanDescription("Connections per node.") + public int getConnectionsPerNode(); + + /** * Gets local port for shared memory communication. * * @return Port number. @@ -153,6 +183,16 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean { public int getReconnectCount(); /** + * Defines how many non-blocking {@code selector.selectNow()} should be made before + * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. + * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. + * + * @return Selector thread busy-loop iterations. + */ + @MXBeanDescription("Selector thread busy-loop iterations.") + public long getSelectorSpins(); + + /** * Gets value for {@code TCP_NODELAY} socket option. * * @return {@code True} if TCP delay is disabled.
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 9a36f1a..8a9f1c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3408,7 +3408,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Node validation failed [res=" + err + ", node=" + node + ']'); - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId()); @@ -3453,7 +3453,7 @@ class ServerImpl extends TcpDiscoveryImpl { final String rmtMarsh = node.attribute(ATTR_MARSHALLER); if (!F.eq(locMarsh, rmtMarsh)) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Local node's marshaller differs from remote node's marshaller " + @@ -3510,7 +3510,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false; if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID + @@ -3552,7 +3552,7 @@ class ServerImpl extends TcpDiscoveryImpl { final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false; if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " + @@ -3590,7 +3590,7 @@ class ServerImpl extends TcpDiscoveryImpl { final boolean rmtMarshStrSerialVer2Bool = rmtMarshStrSerialVer2 != null ? rmtMarshStrSerialVer2 : false; if (locMarshStrSerialVer2Bool != rmtMarshStrSerialVer2Bool) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 + @@ -3663,7 +3663,7 @@ class ServerImpl extends TcpDiscoveryImpl { final Boolean rmtSrvcCompatibilityEnabled = node.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE + @@ -3698,7 +3698,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } else if (Boolean.FALSE.equals(locSrvcCompatibilityEnabled)) { - utilityPool.submit( + utilityPool.execute( new Runnable() { @Override public void run() { String errMsg = "Remote node doesn't support lazy services configuration and " + http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java index d1c8d19..127778b 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@ -184,6 +184,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { try { srv = new GridNioServer.Builder<byte[]>() .address(addr == null ? InetAddress.getLocalHost() : addr) + .serverName("sock-streamer") .port(port) .listener(lsnr) .logger(log) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java index 55557dd..d173594 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java @@ -20,6 +20,7 @@ package org.apache.ignite.thread; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.NotNull; /** @@ -62,4 +63,9 @@ public class IgniteThreadFactory implements ThreadFactory { @Override public Thread newThread(@NotNull Runnable r) { return new IgniteThread(gridName, threadName, r, idxGen.incrementAndGet()); } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteThreadFactory.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java index 760313b..5721887 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java @@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest { commSpi.setSlowClientQueueLimit(50); commSpi.setSharedMemoryPort(-1); commSpi.setIdleConnectionTimeout(300_000); + commSpi.setConnectionsPerNode(1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java new file mode 100644 index 0000000..e95b1ec --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +/** + * + */ +public class IgniteCommunicationBalanceMultipleConnectionsTest extends IgniteCommunicationBalanceTest { + /** {@inheritDoc} */ + @Override protected int connectionsPerNode() { + return 5; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java new file mode 100644 index 0000000..e142aef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridAbsPredicateX; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private int selectors; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi()); + + commSpi.setSharedMemoryPort(-1); + commSpi.setConnectionsPerNode(connectionsPerNode()); + + if (selectors > 0) + commSpi.setSelectorsCount(selectors); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @return Connections per node. + */ + protected int connectionsPerNode() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testBalance1() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000"); + + try { + selectors = 4; + + final int SRVS = 4; + + startGridsMultiThreaded(SRVS); + + client = true; + + final Ignite client = startGrid(SRVS); + + for (int i = 0; i < 4; i++) { + ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id()); + + client.compute(client.cluster().forNode(node)).call(new DummyCallable(null)); + } + + waitNioBalanceStop(Collections.singletonList(client), 10_000); + + final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr"); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long readMoveCnt1 = srv.readerMoveCount(); + long writeMoveCnt1 = srv.writerMoveCount(); + + int prevNodeIdx = -1; + + for (int iter = 0; iter < 10; iter++) { + int nodeIdx = rnd.nextInt(SRVS); + + while (prevNodeIdx == nodeIdx) + nodeIdx = rnd.nextInt(SRVS); + + prevNodeIdx = nodeIdx; + + log.info("Iteration [iter=" + iter + ", node=" + nodeIdx + ']'); + + final long readMoveCnt = readMoveCnt1; + final long writeMoveCnt = writeMoveCnt1; + + final int nodeIdx0 = nodeIdx; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + byte[] data = new byte[100_000]; + + for (int j = 0; j < 10; j++) { + for (int i = 0; i < SRVS; i++) { + ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id()); + + IgniteCompute compute = client.compute(client.cluster().forNode(node)); + + compute.call(new DummyCallable(i == nodeIdx0 ? data : null)); + } + } + + return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt; + } + }, 30_000); + + waitNioBalanceStop(Collections.singletonList(client), 30_000); + + long readMoveCnt2 = srv.readerMoveCount(); + long writeMoveCnt2 = srv.writerMoveCount(); + + log.info("Move counts [rc1=" + readMoveCnt1 + + ", wc1=" + writeMoveCnt1 + + ", rc2=" + readMoveCnt2 + + ", wc2=" + writeMoveCnt2 + ']'); + + assertTrue(readMoveCnt2 > readMoveCnt1); + assertTrue(writeMoveCnt2 > writeMoveCnt1); + + readMoveCnt1 = readMoveCnt2; + writeMoveCnt1 = writeMoveCnt2; + } + + waitNioBalanceStop(G.allGrids(), 10_000); + } + finally { + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, ""); + } + } + + /** + * @throws Exception If failed. + */ + public void testBalance2() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "1000"); + + try { + startGridsMultiThreaded(5); + + client = true; + + startGridsMultiThreaded(5, 5); + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(idx.incrementAndGet() % 10); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int msgs = rnd.nextInt(500, 600); + + for (int i = 0; i < msgs; i++) { + int sndTo = rnd.nextInt(10); + + ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id()); + + IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode)); + + compute.call(new DummyCallable(new byte[rnd.nextInt(rnd.nextInt(256, 1024))])); + } + + return null; + } + }, 30, "test-thread"); + + waitNioBalanceStop(G.allGrids(), 10_000); + } + } + finally { + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, ""); + } + } + + /** + * @param nodes Node. + * @param timeout Timeout. + * @throws Exception If failed. + */ + private void waitNioBalanceStop(List<Ignite> nodes, long timeout) throws Exception { + final List<GridNioServer> srvs = new ArrayList<>(); + + for (Ignite node : nodes) { + TcpCommunicationSpi spi = (TcpCommunicationSpi) node.configuration().getCommunicationSpi(); + + GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr"); + + srvs.add(srv); + } + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { + @Override public boolean applyx() throws IgniteCheckedException { + List<Long> rCnts = new ArrayList<>(); + List<Long> wCnts = new ArrayList<>(); + + for (GridNioServer srv : srvs) { + long readerMovCnt1 = srv.readerMoveCount(); + long writerMovCnt1 = srv.writerMoveCount(); + + rCnts.add(readerMovCnt1); + wCnts.add(writerMovCnt1); + } + + U.sleep(2000); + + for (int i = 0; i < srvs.size(); i++) { + GridNioServer srv = srvs.get(i); + + long readerMovCnt1 = rCnts.get(i); + long writerMovCnt1 = wCnts.get(i); + + long readerMovCnt2 = srv.readerMoveCount(); + long writerMovCnt2 = srv.writerMoveCount(); + + if (readerMovCnt1 != readerMovCnt2) { + log.info("Readers balance is in progress [node=" + i + ", cnt1=" + readerMovCnt1 + + ", cnt2=" + readerMovCnt2 + ']'); + + return false; + } + if (writerMovCnt1 != writerMovCnt2) { + log.info("Writers balance is in progress [node=" + i + ", cnt1=" + writerMovCnt1 + + ", cnt2=" + writerMovCnt2 + ']'); + + return false; + } + } + + return true; + } + }, timeout)); + } + + /** + * @throws Exception If failed. + */ + public void testRandomBalance() throws Exception { + System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "true"); + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "500"); + + try { + final int NODES = 10; + + startGridsMultiThreaded(NODES); + + final long stopTime = System.currentTimeMillis() + 60_000; + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (System.currentTimeMillis() < stopTime) + ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyCallable(null)); + + return null; + } + }, 20, "test-thread"); + } + finally { + System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, ""); + System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, ""); + } + } + + /** + * + */ + private static class DummyCallable implements IgniteCallable<Object> { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + DummyCallable(byte[] data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return data; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java new file mode 100644 index 0000000..b644878 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class IgniteIoTestMessagesTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(3); + + client = true; + + startGrid(3); + + startGrid(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testIoTestMessages() throws Exception { + for (Ignite node : G.allGrids()) { + IgniteKernal ignite = (IgniteKernal)node; + + List<ClusterNode> rmts = new ArrayList<>(ignite.cluster().forRemotes().nodes()); + + assertEquals(4, rmts.size()); + + for (ClusterNode rmt : rmts) { + ignite.sendIoTest(rmt, new byte[1024], false); + + ignite.sendIoTest(rmt, new byte[1024], true); + + ignite.sendIoTest(rmts, new byte[1024], false); + + ignite.sendIoTest(rmts, new byte[1024], true); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java new file mode 100644 index 0000000..510751e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 6; + + /** */ + private static Random rnd = new Random(); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + int connections = rnd.nextInt(10) + 1; + + log.info("Node connections [name=" + gridName + ", connections=" + connections + ']'); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(rnd.nextBoolean()); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + long seed = U.currentTimeMillis(); + + rnd.setSeed(seed); + + log.info("Random seed: " + seed); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testVariousConnectionNumber() throws Exception { + startGridsMultiThreaded(3); + + client = true; + + startGridsMultiThreaded(3, 3); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(REPLICATED); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + ignite(0).createCache(ccfg); + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + runOperations(5000); + + awaitPartitionMapExchange(); + + int idx = ThreadLocalRandom.current().nextInt(NODES); + + Ignite node = ignite(idx); + + client = node.configuration().isClientMode(); + + stopGrid(idx); + + startGrid(idx); + } + } + + /** + * @param time Execution time. + * @throws Exception If failed. + */ + private void runOperations(final long time) throws Exception { + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + Ignite node = ignite(idx.getAndIncrement() % NODES); + + IgniteCache cache = node.cache(null); + + long stopTime = U.currentTimeMillis() + time; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) { + cache.put(rnd.nextInt(10_000), 0); + + node.compute().broadcast(new DummyJob()); + } + + return null; + } + }, NODES * 10, "test-thread"); + } + + /** + * + */ + private static class DummyJob implements IgniteRunnable { + /** {@inheritDoc} */ + @Override public void run() { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java index 67ec371..eaa9923 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -86,6 +86,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 6 * 60 * 1000; + } + + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -170,9 +175,17 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { } /** + * @param cacheMode Cache mode. + * @param writeSync Write synchronization mode. + * @param fairAff Fair affinity flag. + * @param ignite Node to use. + * @param name Cache name. */ - protected void createCache(CacheMode cacheMode, CacheWriteSynchronizationMode writeSync, boolean fairAff, - Ignite ignite, String name) { + protected void createCache(CacheMode cacheMode, + CacheWriteSynchronizationMode writeSync, + boolean fairAff, + Ignite ignite, + String name) { ignite.createCache(cacheConfiguration(name, cacheMode, writeSync, fairAff)); } @@ -269,9 +282,18 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { boolean checkData = fullSync && !optimistic; + long stopTime = System.currentTimeMillis() + 10_000; + for (int i = 0; i < 10_000; i++) { - if (i % 100 == 0) + if (i % 100 == 0) { + if (System.currentTimeMillis() > stopTime) { + log.info("Stop on timeout, iteration: " + i); + + break; + } + log.info("Iteration: " + i); + } boolean rollback = i % 10 == 0; @@ -557,4 +579,4 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { return old; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java index 9405a19..3a2bc81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java @@ -200,7 +200,9 @@ public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridComm private void testRebalance(final Operation operation) throws Exception { interceptor = new RebalanceUpdateInterceptor(); - for (int iter = 0; iter < TEST_ITERATIONS; iter++) { + long stopTime = System.currentTimeMillis() + 2 * 60_000; + + for (int iter = 0; iter < TEST_ITERATIONS && System.currentTimeMillis() < stopTime; iter++) { log.info("Iteration: " + iter); failed = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java index 9458a63..6e2e91f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java @@ -115,10 +115,10 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea if (gridCount() > 1) testPutTx(keyForNode(1), PESSIMISTIC); } - + /** * TODO: IGNITE-592. - * + * * @throws Exception If failed. */ public void testPutTxOptimistic() throws Exception { @@ -227,4 +227,4 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea assertFalse(failed); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java new file mode 100644 index 0000000..30fc9ef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +/** + * + */ +public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest { + /** {@inheritDoc} */ + @Override protected int connectionsPerNode() { + return 10; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java new file mode 100644 index 0000000..71772ef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; + +/** + * + */ +public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi(); + + assertTrue(commSpi.isUsePairedConnections()); + + commSpi.setUsePairedConnections(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java new file mode 100644 index 0000000..919aea6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * + */ +public class IgniteCacheConnectionRecovery10ConnectionsTest extends IgniteCacheConnectionRecoveryTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(10); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java index 2f700f3..a91de67 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java @@ -107,7 +107,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { try { int iter = 0; - while (System.currentTimeMillis() < stopTime) { + while (System.currentTimeMillis() < stopTime && iter < 5) { log.info("Iteration: " + iter++); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index 0460a8f..1bfd727 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA commSpi.setSocketWriteTimeout(1000); commSpi.setSharedMemoryPort(-1); + commSpi.setConnectionsPerNode(connectionsPerNode()); cfg.setCommunicationSpi(commSpi); @@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA } /** + * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}. + */ + protected int connectionsPerNode() { + return TcpCommunicationSpi.DFLT_CONN_PER_NODE; + } + + /** * @return Cache atomicity mode. */ protected abstract CacheAtomicityMode atomicityMode(); @@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA static boolean closeSessions(Ignite ignite) throws Exception { TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi(); - Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); + Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, "clients"); boolean closed = false; - for (GridCommunicationClient client : clients.values()) { - GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client; + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) { + GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client; - GridNioSession ses = client0.session(); + GridNioSession ses = client0.session(); - ses.close(); + ses.close(); - closed = true; + closed = true; + } + } } return closed; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java index 6256225..0dd4079 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java @@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { // Try provoke connection close on socket writeTimeout. commSpi.setSharedMemoryPort(-1); commSpi.setMessageQueueLimit(10); - commSpi.setSocketReceiveBuffer(32); - commSpi.setSocketSendBuffer(32); + commSpi.setSocketReceiveBuffer(40); + commSpi.setSocketSendBuffer(40); commSpi.setSocketWriteTimeout(100); commSpi.setUnacknowledgedMessagesBufferSize(1000); commSpi.setConnectTimeout(10_000); @@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { super.afterTest(); } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60_000; + } + /** * @throws Exception If failed. */ public void testMessageQueueLimit() throws Exception { - startGridsMultiThreaded(3); - - for (int i = 0; i < 15; i++) { + for (int i = 0; i < 3; i++) { log.info("Iteration: " + i); + startGridsMultiThreaded(3); + IgniteInternalFuture<?> fut1 = startJobThreads(50); U.sleep(100); @@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { fut1.get(); fut2.get(); + + stopAllGrids(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java index 3fca826..322690c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java @@ -86,7 +86,6 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest { plc.setMaxSize(100000); ccfg.setEvictionPolicy(plc); - ccfg.setEvictSynchronized(true); c.setCacheConfiguration(ccfg); @@ -95,6 +94,11 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest { return c; } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60_000; + } + /** * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java new file mode 100644 index 0000000..e8175e5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest extends + GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setStripedPoolSize(-1); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java new file mode 100644 index 0000000..05fe85f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest extends + GridCachePartitionedMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setStripedPoolSize(-1); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java index c9d18eb..e9d74ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java @@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { tx.commit(); } catch (Exception e) { - e.printStackTrace(); + log.info("Ignore error: " + e); } } }, NODES_CNT * 3, "tx-thread"); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java index aa240aa..f6a06c2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java @@ -111,6 +111,9 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { cfg.setClientMode(client); + // Test spi blocks message send, this can cause hang with striped pool. + cfg.setStripedPoolSize(-1); + return cfg; } @@ -274,8 +277,8 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { Object k; - log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + - ", tx=" + tx + ", key=" + transformer.apply(key) + ']'); + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() + + ", tx=" + tx.xid() + ", key=" + transformer.apply(key) + ']'); cache.put(transformer.apply(key), 0); @@ -309,23 +312,27 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { entries.put(k, 2); } - log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() + - ", tx=" + tx + ", entries=" + entries + ']'); + log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() + + ", tx=" + tx.xid() + ", entries=" + entries + ']'); cache.putAll(entries); tx.commit(); } catch (Throwable e) { - U.error(log, "Expected exception: ", e); + log.info("Expected exception: " + e); + + e.printStackTrace(System.out); // At least one stack trace should contain TransactionDeadlockException. if (hasCause(e, TransactionTimeoutException.class) && - hasCause(e, TransactionDeadlockException.class) - ) { - if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) - U.error(log, "At least one stack trace should contain " + - TransactionDeadlockException.class.getSimpleName(), e); + hasCause(e, TransactionDeadlockException.class)) { + if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) { + log.info("At least one stack trace should contain " + + TransactionDeadlockException.class.getSimpleName()); + + e.printStackTrace(System.out); + } } } } @@ -344,7 +351,7 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest { TransactionDeadlockException deadlockE = deadlockErr.get(); - assertNotNull(deadlockE); + assertNotNull("Failed to detect deadlock", deadlockE); boolean fail = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java index 6fc7e02..7b5abf5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java @@ -372,4 +372,4 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr X.println("Executing cache service: " + ctx.name()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java index adcd144..4bc9f01 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.future; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -227,87 +228,98 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest { * * @throws Exception In case of any exception. */ - @SuppressWarnings("ErrorNotRethrown") public void testChaining() throws Exception { + checkChaining(null); + + ExecutorService exec = Executors.newFixedThreadPool(1); + + try { + checkChaining(exec); + + GridFinishedFuture<Integer> fut = new GridFinishedFuture<>(1); + + IgniteInternalFuture<Object> chain = fut.chain(new CX1<IgniteInternalFuture<Integer>, Object>() { + @Override public Object applyx(IgniteInternalFuture<Integer> fut) throws IgniteCheckedException { + return fut.get() + 1; + } + }, exec); + + assertEquals(2, chain.get()); + } + finally { + exec.shutdown(); + } + } + + /** + * @param exec Executor for chain callback. + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkChaining(ExecutorService exec) throws Exception { final CX1<IgniteInternalFuture<Object>, Object> passThrough = new CX1<IgniteInternalFuture<Object>, Object>() { @Override public Object applyx(IgniteInternalFuture<Object> f) throws IgniteCheckedException { return f.get(); } }; - final GridTestKernalContext ctx = new GridTestKernalContext(log); - - ctx.setExecutorService(Executors.newFixedThreadPool(1)); - ctx.setSystemExecutorService(Executors.newFixedThreadPool(1)); - - ctx.add(new PoolProcessor(ctx)); - ctx.add(new GridClosureProcessor(ctx)); + GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); + IgniteInternalFuture<Object> chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough); - ctx.start(); + assertFalse(fut.isDone()); + assertFalse(chain.isDone()); try { - // Test result returned. - - GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); - IgniteInternalFuture<Object> chain = fut.chain(passThrough); + chain.get(20); - assertFalse(fut.isDone()); - assertFalse(chain.isDone()); - - try { - chain.get(20); - - fail("Expects timeout exception."); - } - catch (IgniteFutureTimeoutCheckedException e) { - info("Expected timeout exception: " + e.getMessage()); - } + fail("Expects timeout exception."); + } + catch (IgniteFutureTimeoutCheckedException e) { + info("Expected timeout exception: " + e.getMessage()); + } - fut.onDone("result"); + fut.onDone("result"); - assertEquals("result", chain.get(1)); + assertEquals("result", chain.get(1)); - // Test exception re-thrown. + // Test exception re-thrown. - fut = new GridFutureAdapter<>(); - chain = fut.chain(passThrough); + fut = new GridFutureAdapter<>(); + chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough); - fut.onDone(new ClusterGroupEmptyCheckedException("test exception")); + fut.onDone(new ClusterGroupEmptyCheckedException("test exception")); - try { - chain.get(); + try { + chain.get(); - fail("Expects failed with exception."); - } - catch (ClusterGroupEmptyCheckedException e) { - info("Expected exception: " + e.getMessage()); - } + fail("Expects failed with exception."); + } + catch (ClusterGroupEmptyCheckedException e) { + info("Expected exception: " + e.getMessage()); + } - // Test error re-thrown. + // Test error re-thrown. - fut = new GridFutureAdapter<>(); - chain = fut.chain(passThrough); + fut = new GridFutureAdapter<>(); + chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough); - try { - fut.onDone(new StackOverflowError("test error")); + try { + fut.onDone(new StackOverflowError("test error")); + if (exec == null) fail("Expects failed with error."); - } - catch (StackOverflowError e) { - info("Expected error: " + e.getMessage()); - } + } + catch (StackOverflowError e) { + info("Expected error: " + e.getMessage()); + } - try { - chain.get(); + try { + chain.get(); - fail("Expects failed with error."); - } - catch (StackOverflowError e) { - info("Expected error: " + e.getMessage()); - } + fail("Expects failed with error."); } - finally { - ctx.stop(false); + catch (StackOverflowError e) { + info("Expected error: " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java index 201fd27..d403784 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java @@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { proceedExceptionCaught(ses, ex); } - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME)); sndMsgObj.compareAndSet(null, msg); @@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { chain.onSessionIdleTimeout(ses); chain.onSessionWriteTimeout(ses); assertNull(chain.onSessionClose(ses)); - assertNull(chain.onSessionWrite(ses, snd)); + assertNull(chain.onSessionWrite(ses, snd, true)); assertEquals("DCBA", connectedEvt.get()); assertEquals("DCBA", disconnectedEvt.get()); @@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { chainMeta(ses, MESSAGE_WRITE_META_NAME); - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) { + // No-op. + } + + /** {@inheritDoc} */ @Override public GridNioFuture<Object> resumeReads() { return null; } @@ -369,13 +374,28 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() { + return null; + } + + /** {@inheritDoc} */ + @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { // No-op. } /** {@inheritDoc} */ - @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { return null; } + + /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + // No-op. + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java index 61a13b1..25dd780 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java @@ -83,7 +83,7 @@ public class GridNioBenchmarkClient { */ public void run() throws IOException, InterruptedException { for (int i = 0; i < connCnt; i++) - exec.submit(new ClientThread()); + exec.execute(new ClientThread()); Thread.sleep(5*60*1000); @@ -167,4 +167,4 @@ public class GridNioBenchmarkClient { return read; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java index f21f31b..a18ef32 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java @@ -196,4 +196,4 @@ public class GridP2PRecursionTaskSelfTest extends GridCommonAbstractTest { return ignite.compute().execute(FactorialTask.class, arg); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 652e47f..5ca8f26 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { private static final int commExtPort2 = 20100; /** */ - private AddressResolver resolver; + private AddressResolver rslvr; /** */ private boolean ipFinderUseLocPorts; @@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { Map<String, Object> attrs = new HashMap<>(node.attributes()); attrs.remove(createSpiAttributeName(ATTR_PORT)); ((TcpDiscoveryNode)node).setAttributes(attrs); - return super.createTcpClient(node); + return super.createTcpClient(node, connIdx); } }; @@ -126,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { commSpi.setLocalPort(commLocPort); commSpi.setLocalPortRange(1); commSpi.setSharedMemoryPort(-1); + commSpi.setConnectionsPerNode(1); cfg.setCommunicationSpi(commSpi); - assert resolver != null; + assert rslvr != null; - cfg.setAddressResolver(resolver); + cfg.setAddressResolver(rslvr); return cfg; } @@ -147,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2))); map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2))); - resolver = new AddressResolver() { + rslvr = new AddressResolver() { @Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) { return map.get(addr); } @@ -167,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2); map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2); - resolver = new BasicAddressResolver(map); + rslvr = new BasicAddressResolver(map); doTestForward(); } @@ -180,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put("127.0.0.1", "127.0.0.1"); - resolver = new BasicAddressResolver(map); + rslvr = new BasicAddressResolver(map); ipFinderUseLocPorts = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 076724d..3c4fea0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.afterTest(); for (CommunicationSpi spi : spis.values()) { - ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients"); + ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients"); + + for (int i = 0; i < 20; i++) { + GridCommunicationClient client0 = null; + + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) { + client0 = client; + + break; + } + } + + if (client0 != null) + break; + } + + if (client0 == null) + return; - for (int i = 0; i < 20 && !clients.isEmpty(); i++) { info("Check failed for SPI [grid=" + - GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']'); + GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + + ", client=" + client0 + + ", spi=" + spi + ']'); U.sleep(1000); } - assert clients.isEmpty() : "Clients: " + clients; + fail("Failed to wait when clients are closed."); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 8635d94..a649130 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; @@ -83,6 +84,12 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic /** Use ssl. */ protected boolean useSsl; + /** */ + private int connectionsPerNode = 1; + + /** */ + private boolean pairedConnections = true; + /** * */ @@ -163,6 +170,34 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic /** * @throws Exception If failed. */ + public void testMultithreaded_10Connections() throws Exception { + connectionsPerNode = 10; + + testMultithreaded(); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreaded_NoPairedConnections() throws Exception { + pairedConnections = false; + + testMultithreaded(); + } + + /** + * @throws Exception If failed. + */ + public void testMultithreaded_10ConnectionsNoPaired() throws Exception { + pairedConnections = false; + connectionsPerNode = 10; + + testMultithreaded(); + } + + /** + * @throws Exception If failed. + */ public void testWithLoad() throws Exception { int threads = Runtime.getRuntime().availableProcessors() * 5; @@ -244,7 +279,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic final AtomicInteger idx = new AtomicInteger(); try { - GridTestUtils.runMultiThreaded(new Callable<Void>() { + final Callable<Void> c = new Callable<Void>() { @Override public Void call() throws Exception { int idx0 = idx.getAndIncrement(); @@ -270,7 +305,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic return null; } - }, threads, "test"); + }; + + List<Thread> threadsList = new ArrayList<>(); + + final AtomicBoolean fail = new AtomicBoolean(); + + final AtomicLong tId = new AtomicLong(); + + for (int t = 0; t < threads; t++) { + Thread t0 = new Thread(new Runnable() { + @Override public void run() { + try { + c.call(); + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + fail.set(true); + } + } + }) { + @Override public long getId() { + // Override getId to use all connections. + return tId.getAndIncrement(); + } + }; + + threadsList.add(t0); + + t0.start(); + } + + for (Thread t0 : threadsList) + t0.join(); assertTrue(latch.await(10, TimeUnit.SECONDS)); @@ -281,17 +349,19 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic final GridNioServer srv = U.field(spi, "nioSrvr"); + final int conns = pairedConnections ? 2 : 1; + GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { Collection sessions = U.field(srv, "sessions"); - return sessions.size() == 1; + return sessions.size() == conns * connectionsPerNode; } }, 5000); Collection sessions = U.field(srv, "sessions"); - assertEquals(1, sessions.size()); + assertEquals(conns * connectionsPerNode, sessions.size()); } assertEquals(expMsgs, lsnr.cntr.get()); @@ -320,6 +390,8 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic spi.setIdleConnectionTimeout(60_000); spi.setConnectTimeout(10_000); spi.setSharedMemoryPort(-1); + spi.setConnectionsPerNode(connectionsPerNode); + spi.setUsePairedConnections(pairedConnections); return spi; } @@ -434,4 +506,4 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic rsrcs.stopThreads(); } -} \ No newline at end of file +}
