ignite-3547 Do not try to re-send message using the same client. Remove disconnected client from 'onDisconnected' callback.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a20ca351 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a20ca351 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a20ca351 Branch: refs/heads/ignite-3220-1 Commit: a20ca351b33efb07b83c6f5967fa7a3cef154c83 Parents: 8aa534a Author: sboikov <[email protected]> Authored: Fri Aug 19 10:37:59 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Aug 19 10:37:59 2016 +0300 ---------------------------------------------------------------------- .../util/nio/GridTcpNioCommunicationClient.java | 5 +- .../communication/tcp/TcpCommunicationSpi.java | 49 +++-- .../CacheSerializableTransactionsTest.java | 5 + .../IgniteCacheConnectionRecoveryTest.java | 205 +++++++++++++++++++ .../IgniteCacheMessageRecoveryAbstractTest.java | 14 +- ...gniteCacheMessageRecoveryIdleConnection.java | 154 -------------- ...eCacheMessageRecoveryIdleConnectionTest.java | 157 ++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 6 +- 8 files changed, 407 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 4022bc5..5fe521d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -125,8 +125,11 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie if (log.isDebugEnabled()) log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); - if (e.getCause() instanceof IOException) + if (e.getCause() instanceof IOException) { + ses.close(); + return true; + } else throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2c03b2d..d81b9f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -353,6 +353,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID id = ses.meta(NODE_ID_META); if (id != null) { + GridCommunicationClient client = clients.get(id); + + if (client instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient) client).session() == ses) { + client.close(); + + clients.remove(id, client); + } + if (!stopping) { boolean reconnect = false; @@ -372,9 +381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter recoveryData.onNodeLeft(); } - DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id, - ses, - recoveryData, + DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData, reconnect); commWorker.addProcessDisconnectRequest(disconnectData); @@ -1400,6 +1407,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(']').append(U.nl()); } + sb.append("Communication SPI clients: ").append(U.nl()); + + for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) { + sb.append(" [node=").append(entry.getKey()) + .append(", client=").append(entry.getValue()) + .append(']').append(U.nl()); + } + U.warn(log, sb.toString()); } @@ -1978,17 +1993,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client.release(); - client = null; - if (!retry) sentMsgsCnt.increment(); else { + clients.remove(node.id(), client); + ClusterNode node0 = getSpiContext().node(node.id()); if (node0 == null) throw new IgniteCheckedException("Failed to send message to remote node " + "(node has left the grid): " + node.id()); } + + client = null; } while (retry); } @@ -3187,12 +3204,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param sesInfo Disconnected session information. */ private void processDisconnect(DisconnectedSessionInfo sesInfo) { - GridCommunicationClient client = clients.get(sesInfo.nodeId); - - if (client instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses) - clients.remove(sesInfo.nodeId, client); - if (sesInfo.reconnect) { GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; @@ -3205,7 +3216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - client = reserveClient(node); + GridCommunicationClient client = reserveClient(node); client.release(); } @@ -3756,29 +3767,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private static class DisconnectedSessionInfo { /** */ - private final UUID nodeId; - - /** */ - private final GridNioSession ses; - - /** */ private final GridNioRecoveryDescriptor recoveryDesc; /** */ private final boolean reconnect; /** - * @param nodeId Node ID. - * @param ses Session. * @param recoveryDesc Recovery descriptor. * @param reconnect Reconnect flag. */ - public DisconnectedSessionInfo(UUID nodeId, - GridNioSession ses, - @Nullable GridNioRecoveryDescriptor recoveryDesc, + DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, boolean reconnect) { - this.nodeId = nodeId; - this.ses = ses; this.recoveryDesc = recoveryDesc; this.reconnect = reconnect; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 4baef66..3d4f850 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -3311,6 +3311,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void getRemoveTx(boolean nearCache, boolean store) throws Exception { + long stopTime = U.currentTimeMillis() + getTestTimeout() - 30_000; + final Ignite ignite0 = ignite(0); CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, store, false); @@ -3330,6 +3332,9 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } for (int i = 0; i < 100; i++) { + if (U.currentTimeMillis() > stopTime) + break; + final AtomicInteger cntr = new AtomicInteger(); final Integer key = i; http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java new file mode 100644 index 0000000..7195c37 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecoveryTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheConnectionRecoveryTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int SRVS = 5; + + /** */ + private static final int CLIENTS = 5; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + cfg.setCacheConfiguration( + cacheConfiguration("cache1", TRANSACTIONAL), + cacheConfiguration("cache2", ATOMIC)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testConnectionRecovery() throws Exception { + final Map<Integer, Integer> data = new TreeMap<>(); + + for (int i = 0; i < 500; i++) + data.put(i, i); + + final AtomicInteger idx = new AtomicInteger(); + + final long stopTime = U.currentTimeMillis() + 30_000; + + final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>(); + + final int TEST_THREADS = (CLIENTS + SRVS) * 2; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx0 = idx.getAndIncrement(); + Ignite node = ignite(idx0 % (SRVS + CLIENTS)); + + Thread.currentThread().setName("test-thread-" + idx0 + "-" + node.name()); + + IgniteCache cache1 = node.cache("cache1").withAsync(); + IgniteCache cache2 = node.cache("cache2").withAsync(); + + int iter = 0; + + while (U.currentTimeMillis() < stopTime) { + try { + cache1.putAll(data); + cache1.future().get(15, SECONDS); + + cache2.putAll(data); + cache2.future().get(15, SECONDS); + + CyclicBarrier b = barrierRef.get(); + + if (b != null) + b.await(15, SECONDS); + } + catch (Exception e) { + synchronized (IgniteCacheConnectionRecoveryTest.class) { + log.error("Failed to execute update, will dump debug information" + + " [err=" + e+ ", iter=" + iter + ']', e); + + List<Ignite> nodes = IgnitionEx.allGridsx(); + + for (Ignite node0 : nodes) + ((IgniteKernal)node0).dumpDebugInfo(); + + U.dumpThreads(log); + } + + throw e; + } + } + + return null; + } + }, TEST_THREADS, "test-thread"); + + while (System.currentTimeMillis() < stopTime) { + boolean closed = false; + + for (Ignite node : G.allGrids()) { + if (IgniteCacheMessageRecoveryAbstractTest.closeSessions(node)) + closed = true; + } + + if (closed) { + CyclicBarrier b = new CyclicBarrier(TEST_THREADS + 1, new Runnable() { + @Override public void run() { + barrierRef.set(null); + } + }); + + barrierRef.set(b); + + b.await(); + } + + U.sleep(50); + } + + fut.get(); + } + + /** + * @param name Cache name. + * @param atomicityMode Cache atomicity mode. + * @return Configuration. + */ + private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(REPLICATED); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index 16d7e5d..0460a8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -150,7 +150,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA for (int i = 0; i < 30; i++) { Thread.sleep(1000); - closed |= closeSessions(); + Ignite node0 = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT)); + + log.info("Close sessions for: " + ignite.name()); + + closed |= closeSessions(node0); } assertTrue(closed); @@ -163,13 +167,11 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA } /** + * @param ignite Node. * @throws Exception If failed. + * @return {@code True} if closed at least one session. */ - private boolean closeSessions() throws Exception { - Ignite ignite = ignite(ThreadLocalRandom.current().nextInt(0, GRID_CNT)); - - log.info("Close sessions for: " + ignite.name()); - + static boolean closeSessions(Ignite ignite) throws Exception { TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi(); Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java deleted file mode 100644 index 618fe2a..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * - */ -public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int NODES = 3; - - /** */ - private static final long IDLE_TIMEOUT = 50; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT); - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 2 * 60_000; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(NODES); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testCacheOperationsIdleConnectionCloseTx() throws Exception { - cacheOperationsIdleConnectionClose(TRANSACTIONAL); - } - - /** - * @throws Exception If failed. - */ - public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception { - cacheOperationsIdleConnectionClose(ATOMIC); - } - - /** - * @param atomicityMode Cache atomicity mode. - * @throws Exception If failed. - */ - private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception { - CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - - ccfg.setAtomicityMode(atomicityMode); - ccfg.setCacheMode(REPLICATED); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - - IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync(); - - try { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - int iter = 0; - - long stopTime = System.currentTimeMillis() + 90_000; - - while (System.currentTimeMillis() < stopTime) { - if (iter++ % 10 == 0) - log.info("Iteration: " + iter); - - cache.put(iter, 1); - - IgniteFuture<?> fut = cache.future(); - - try { - fut.get(10_000); - } - catch (IgniteException e) { - List<Ignite> nodes = IgnitionEx.allGridsx(); - - for (Ignite node : nodes) - ((IgniteKernal)node).dumpDebugInfo(); - - U.dumpThreads(log); - - throw e; - } - - U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10)); - } - } - finally { - ignite(0).destroyCache(ccfg.getName()); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java new file mode 100644 index 0000000..b9003cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnectionTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheMessageRecoveryIdleConnectionTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 3; + + /** */ + private static final long IDLE_TIMEOUT = 50; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT); + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 2 * 60_000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperationsIdleConnectionCloseTx() throws Exception { + cacheOperationsIdleConnectionClose(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception { + cacheOperationsIdleConnectionClose(ATOMIC); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(REPLICATED); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync(); + + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int iter = 0; + + long stopTime = System.currentTimeMillis() + 90_000; + + while (System.currentTimeMillis() < stopTime) { + if (iter++ % 50 == 0) + log.info("Iteration: " + iter); + + cache.put(iter, 1); + + IgniteFuture<?> fut = cache.future(); + + try { + fut.get(10_000); + } + catch (IgniteException e) { + log.error("Failed to execute update, will dump debug information" + + " [err=" + e+ ", iter=" + iter + ']', e); + + List<Ignite> nodes = IgnitionEx.allGridsx(); + + for (Ignite node : nodes) + ((IgniteKernal)node).dumpDebugInfo(); + + U.dumpThreads(log); + + throw e; + } + + U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10)); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a20ca351/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 84e1502..9240ef5 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -125,7 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest; -import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest; @@ -283,7 +284,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class); suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class); suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class); - suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class); + suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class); + suite.addTestSuite(IgniteCacheConnectionRecoveryTest.class); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);
