Repository: ignite Updated Branches: refs/heads/ignite-1758 1444db19e -> 40c6d35eb
ignite-1758 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40c6d35e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40c6d35e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40c6d35e Branch: refs/heads/ignite-1758 Commit: 40c6d35eba479ade083de62de0ef8aaa81cb33fe Parents: 1444db1 Author: sboikov <[email protected]> Authored: Thu Oct 22 14:03:16 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 26 09:39:26 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../util/nio/GridNioRecoveryDescriptor.java | 11 +- .../communication/tcp/TcpCommunicationSpi.java | 8 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 122 ++++++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 30 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- ...teClientReconnectCacheMultiThreadedTest.java | 232 -------------- ...gniteClientReconnectMassiveShutdownTest.java | 306 +++++++++++++++++++ .../testsuites/IgniteClientNodesTestSuite.java | 3 + 9 files changed, 434 insertions(+), 283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 5d3b08b..26a4f3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -353,6 +353,9 @@ public final class IgniteSystemProperties { /** Maximum size for affinity assignment history. */ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** Maximum size for discovery messages history. */ + public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE"; + /** Number of cache operation retries in case of topology exceptions. */ public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 88837de..5647239 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -193,14 +193,19 @@ public class GridNioRecoveryDescriptor { /** * Node left callback. + * + * @return {@code False} if descriptor is reserved. */ - public void onNodeLeft() { + public boolean onNodeLeft() { GridNioFuture<?>[] futs = null; synchronized (this) { nodeLeft = true; - if (!reserved && !msgFuts.isEmpty()) { + if (reserved) + return false; + + if (!msgFuts.isEmpty()) { futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); msgFuts.clear(); @@ -209,6 +214,8 @@ public class GridNioRecoveryDescriptor { if (futs != null) completeOnNodeLeft(futs); + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 5ea2c02..c29943c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3100,10 +3100,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert !left.isEmpty(); for (ClientKey id : left) { - GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); + GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); - if (recoverySnd != null) - recoverySnd.onNodeLeft(); + if (recoverySnd != null) { + if (recoverySnd.onNodeLeft()) + recoveryDescs.remove(id); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a5d0866..7e5af5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -461,7 +461,8 @@ class ClientImpl extends TcpDiscoveryImpl { * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { + @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout) + throws IgniteSpiException, InterruptedException { Collection<InetSocketAddress> addrs = null; long startTime = U.currentTimeMillis(); @@ -501,7 +502,7 @@ class ClientImpl extends TcpDiscoveryImpl { InetSocketAddress addr = it.next(); - T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); + T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); if (sockAndRes == null) { it.remove(); @@ -511,11 +512,11 @@ class ClientImpl extends TcpDiscoveryImpl { assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes; - Socket sock = sockAndRes.get1(); + Socket sock = sockAndRes.get1().socket(); switch (sockAndRes.get2()) { case RES_OK: - return new T2<>(sock, sockAndRes.get3()); + return new T2<>(sockAndRes.get1(), sockAndRes.get3()); case RES_CONTINUE_JOIN: case RES_WAIT: @@ -548,7 +549,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param addr Address. * @return Socket, connect response and client acknowledge support flag. */ - @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { + @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { assert addr != null; if (log.isDebugEnabled()) @@ -621,7 +622,8 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), + return new T3<>(new SocketStream(sock), + spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), res.clientAck()); } catch (IOException | IgniteCheckedException e) { @@ -765,7 +767,10 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void brakeConnection() { - U.closeQuiet(msgWorker.currSock); + SocketStream sockStream = msgWorker.currSock; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); } /** {@inheritDoc} */ @@ -826,7 +831,7 @@ class ClientImpl extends TcpDiscoveryImpl { private final Object mux = new Object(); /** */ - private Socket sock; + private SocketStream sockStream; /** */ private UUID rmtNodeId; @@ -838,12 +843,12 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * @param sock Socket. + * @param sockStream Socket. * @param rmtNodeId Rmt node id. */ - public void setSocket(Socket sock, UUID rmtNodeId) { + public void setSocket(SocketStream sockStream, UUID rmtNodeId) { synchronized (mux) { - this.sock = sock; + this.sockStream = sockStream; this.rmtNodeId = rmtNodeId; @@ -854,22 +859,24 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { while (!isInterrupted()) { - Socket sock; + SocketStream sockStream; UUID rmtNodeId; synchronized (mux) { - if (this.sock == null) { + if (this.sockStream == null) { mux.wait(); continue; } - sock = this.sock; + sockStream = this.sockStream; rmtNodeId = this.rmtNodeId; } + Socket sock = sockStream.socket(); + try { - InputStream in = new BufferedInputStream(sock.getInputStream()); + InputStream in = sockStream.stream(); sock.setKeepAlive(true); sock.setTcpNoDelay(true); @@ -923,7 +930,7 @@ class ClientImpl extends TcpDiscoveryImpl { } } catch (IOException e) { - msgWorker.addMessage(new SocketClosedMessage(sock)); + msgWorker.addMessage(new SocketClosedMessage(sockStream)); if (log.isDebugEnabled()) U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e); @@ -932,8 +939,8 @@ class ClientImpl extends TcpDiscoveryImpl { U.closeQuiet(sock); synchronized (mux) { - if (this.sock == sock) { - this.sock = null; + if (this.sockStream == sockStream) { + this.sockStream = null; this.rmtNodeId = null; } } @@ -1125,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl { */ private class Reconnector extends IgniteSpiThread { /** */ - private volatile Socket sock; + private volatile SocketStream sockStream; /** */ private boolean clientAck; @@ -1148,7 +1155,10 @@ class ClientImpl extends TcpDiscoveryImpl { public void cancel() { interrupt(); - U.closeQuiet(sock); + SocketStream sockStream = this.sockStream; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); } /** {@inheritDoc} */ @@ -1166,24 +1176,26 @@ class ClientImpl extends TcpDiscoveryImpl { try { while (true) { - T2<Socket, Boolean> joinRes = joinTopology(true, timeout); + T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout); if (joinRes == null) { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ']')); } else U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + - " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + " configuration property) [networkTimeout=" + spi.netTimeout + ']'); return; } - sock = joinRes.get1(); + sockStream = joinRes.get1(); clientAck = joinRes.get2(); + Socket sock = sockStream.socket(); + if (isInterrupted()) throw new InterruptedException(); @@ -1194,7 +1206,7 @@ class ClientImpl extends TcpDiscoveryImpl { sock.setSoTimeout((int)spi.netTimeout); - InputStream in = new BufferedInputStream(sock.getInputStream()); + InputStream in = sockStream.stream(); sock.setKeepAlive(true); sock.setTcpNoDelay(true); @@ -1270,7 +1282,10 @@ class ClientImpl extends TcpDiscoveryImpl { } finally { if (!success) { - U.closeQuiet(sock); + SocketStream sockStream = this.sockStream; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); if (join) joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " + @@ -1290,7 +1305,7 @@ class ClientImpl extends TcpDiscoveryImpl { private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>(); /** */ - private Socket currSock; + private SocketStream currSock; /** Indicates that pending messages are currently processed. */ private boolean pending; @@ -1469,7 +1484,10 @@ class ClientImpl extends TcpDiscoveryImpl { } } finally { - U.closeQuiet(currSock); + SocketStream currSock = this.currSock; + + if (currSock != null) + U.closeQuiet(currSock.socket()); if (joinLatch.getCount() > 0) joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. @@ -1492,7 +1510,7 @@ class ClientImpl extends TcpDiscoveryImpl { joinCnt++; - T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout); + T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout); if (joinRes == null) { if (join) @@ -1508,7 +1526,7 @@ class ClientImpl extends TcpDiscoveryImpl { currSock = joinRes.get1(); - sockWriter.setSocket(joinRes.get1(), joinRes.get2()); + sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2()); if (spi.joinTimeout > 0) { final int joinCnt0 = joinCnt; @@ -1877,9 +1895,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (reconnector != null) { assert msg.success() : msg; - currSock = reconnector.sock; + currSock = reconnector.sockStream; - sockWriter.setSocket(currSock, reconnector.clientAck); + sockWriter.setSocket(currSock.socket(), reconnector.clientAck); sockReader.setSocket(currSock, locNode.clientRouterNodeId()); reconnector = null; @@ -2050,13 +2068,51 @@ class ClientImpl extends TcpDiscoveryImpl { */ private static class SocketClosedMessage { /** */ + private final SocketStream sock; + + /** + * @param sock Socket. + */ + private SocketClosedMessage(SocketStream sock) { + this.sock = sock; + } + } + + /** + * + */ + private static class SocketStream { + /** */ private final Socket sock; + /** */ + private final InputStream in; + /** * @param sock Socket. + * @throws IOException If failed to create stream. */ - private SocketClosedMessage(Socket sock) { + public SocketStream(Socket sock) throws IOException { + assert sock != null; + this.sock = sock; + + this.in = new BufferedInputStream(sock.getInputStream()); + } + + /** + * @return Socket. + */ + Socket socket() { + return sock; + + } + + /** + * @return Socket input stream. + */ + InputStream stream() { + return in; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index d8ee953..6bc0402 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -126,6 +126,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -152,6 +154,9 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @SuppressWarnings("All") class ServerImpl extends TcpDiscoveryImpl { /** */ + private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10); + + /** */ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); @@ -1445,6 +1450,12 @@ class ServerImpl extends TcpDiscoveryImpl { tmp = U.arrayList(readers); } + for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) { + U.interrupt(msgWorker); + + U.join(msgWorker, log); + } + U.interrupt(tmp); U.joinThreads(tmp, log); @@ -1742,22 +1753,17 @@ class ServerImpl extends TcpDiscoveryImpl { * Discovery messages history used for client reconnect. */ private class EnsuredMessageHistory { - /** */ - private static final int MAX = 1024; - /** Pending messages. */ - private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + private final GridBoundedLinkedHashSet<TcpDiscoveryAbstractMessage> + msgs = new GridBoundedLinkedHashSet<>(ENSURED_MSG_HIST_SIZE); /** * @param msg Adds message. */ void add(TcpDiscoveryAbstractMessage msg) { - assert spi.ensured(msg) : msg; - - msgs.addLast(msg); + assert spi.ensured(msg) && msg.verified() : msg; - while (msgs.size() > MAX) - msgs.pollFirst(); + msgs.add(msg); } /** @@ -1784,7 +1790,7 @@ class ServerImpl extends TcpDiscoveryImpl { res = new ArrayList<>(msgs.size()); } - if (res != null && msg.verified()) + if (res != null) res.add(prepare(msg, node.id())); } @@ -1810,7 +1816,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.id().equals(lastMsgId)) skip = false; } - else if (msg.verified()) + else cp.add(prepare(msg, node.id())); } @@ -2130,7 +2136,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (spi.ensured(msg)) + if (msg.verified() && spi.ensured(msg)) msgHist.add(msg); if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6254605..7383cd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * This method is intended for test purposes only. */ - void simulateNodeFailure() { + public void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java deleted file mode 100644 index 4321ded..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import java.util.HashMap; -import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.CacheException; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteClientDisconnectedException; -import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMemoryMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cluster.ClusterTopologyException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -/** - * Client reconnect test in multi threaded mode while cache operations are in progress. - */ -public class IgniteClientReconnectCacheMultiThreadedTest extends GridCommonAbstractTest { - /** */ - private static final int GRID_CNT = 14; - - /** */ - private static final int CLIENT_GRID_CNT = 14; - - /** */ - private static volatile boolean clientMode; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** - * @throws Exception If fails. - */ - public IgniteClientReconnectCacheMultiThreadedTest() throws Exception { - super(false); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional"}) - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (clientMode) - cfg.setClientMode(true); - - cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - super.afterTest(); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 5 * 60 * 1000; - } - - /** - * @throws Exception If any error occurs. - */ - public void testMassiveServersShutdown() throws Exception { - clientMode = false; - - final int serversToKill = GRID_CNT / 2; - - startGridsMultiThreaded(GRID_CNT); - - clientMode = true; - - startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); - - final AtomicBoolean done = new AtomicBoolean(); - - // Starting a cache dynamically. - Ignite client = grid(GRID_CNT); - - assertTrue(client.configuration().isClientMode()); - - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setCacheMode(CacheMode.PARTITIONED); - cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - cfg.setBackups(2); - cfg.setOffHeapMaxMemory(0); - cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); - - IgniteCache cache = client.getOrCreateCache(cfg); - - HashMap<String, Integer> put = new HashMap<>(); - - // Preloading the cache with some data. - for (int i = 0; i < 10_000; i++) - put.put(String.valueOf(i), i); - - cache.putAll(put); - - // Preparing client nodes and starting cache operations from them. - final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>(); - - for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) - clientIdx.add(i); - - IgniteInternalFuture<?> clientsFut = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - int idx = clientIdx.take(); - - Ignite ignite = grid(idx); - - assertTrue(ignite.configuration().isClientMode()); - - IgniteCache<String, Integer> cache = ignite.cache(null); - - IgniteTransactions txs = ignite.transactions(); - - Random rand = new Random(); - - while (!done.get()) { - Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, - TransactionIsolation.READ_COMMITTED); - - try { - cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); - - tx.commit(); - } - catch (ClusterTopologyException ex) { - ex.retryReadyFuture().get(); - } - catch (CacheException e) { - if (X.hasCause(e, IgniteClientDisconnectedException.class)) { - IgniteClientDisconnectedException cause = X.cause(e, - IgniteClientDisconnectedException.class); - - cause.reconnectFuture().get(); // Wait for reconnect. - } - else if (X.hasCause(e, ClusterTopologyException.class)) { - ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); - - cause.retryReadyFuture().get(); - } - else - throw e; - } - finally { - tx.close(); - } - } - - return null; - } - }, - CLIENT_GRID_CNT - ); - - // Killing a half of server nodes. - final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>(); - - for (int i = 0; i < serversToKill; i++) - victims.add(i); - - final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>(); - - for (int i = serversToKill; i < GRID_CNT; i++) - assassins.add(i); - - IgniteInternalFuture<?> serversShutdownFut = multithreadedAsync( - new Callable<Object>() { - @Override public Object call() throws Exception { - Thread.sleep(5_000); - - Ignite assassin = grid(assassins.take()); - - assertFalse(assassin.configuration().isClientMode()); - - Ignite victim = grid(victims.take()); - - assertFalse(victim.configuration().isClientMode()); - - assassin.configuration().getDiscoverySpi().failNode(victim.cluster().localNode().id(), null); - - return null; - } - }, - assassins.size() - ); - - serversShutdownFut.get(); - - Thread.sleep(15_000); - - done.set(true); - - clientsFut.get(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java new file mode 100644 index 0000000..5ae5a48 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.HashMap; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Client reconnect test in multi threaded mode while cache operations are in progress. + */ +public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 14; + + /** */ + private static final int CLIENT_GRID_CNT = 14; + + /** */ + private static volatile boolean clientMode; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * @throws Exception If fails. + */ + public IgniteClientReconnectMassiveShutdownTest() throws Exception { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(clientMode); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMassiveServersShutdown1() throws Exception { + massiveServersShutdown(StopType.FAIL_EVENT); + } + + /** + * @throws Exception If any error occurs. + */ + public void testMassiveServersShutdown2() throws Exception { + massiveServersShutdown(StopType.SIMULATE_FAIL); + } + + /** + * @throws Exception If any error occurs. + */ + public void testMassiveServersShutdown3() throws Exception { + massiveServersShutdown(StopType.CLOSE); + } + + /** + * @param stopType How tp stop node. + * @throws Exception If any error occurs. + */ + private void massiveServersShutdown(final StopType stopType) throws Exception { + clientMode = false; + + startGridsMultiThreaded(GRID_CNT); + + clientMode = true; + + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + + final AtomicBoolean done = new AtomicBoolean(); + + // Starting a cache dynamically. + Ignite client = grid(GRID_CNT); + + assertTrue(client.configuration().isClientMode()); + + CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setBackups(2); + cfg.setOffHeapMaxMemory(0); + cfg.setMemoryMode(OFFHEAP_TIERED); + + IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg); + + HashMap<String, Integer> put = new HashMap<>(); + + // Load some data. + for (int i = 0; i < 10_000; i++) + put.put(String.valueOf(i), i); + + cache.putAll(put); + + // Preparing client nodes and starting cache operations from them. + final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>(); + + for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) + clientIdx.add(i); + + IgniteInternalFuture<?> clientsFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = clientIdx.take(); + + Ignite ignite = grid(idx); + + Thread.currentThread().setName("client-thread-" + ignite.name()); + + assertTrue(ignite.configuration().isClientMode()); + + IgniteCache<String, Integer> cache = ignite.cache(null); + + IgniteTransactions txs = ignite.transactions(); + + Random rand = new Random(); + + while (!done.get()) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); + + tx.commit(); + } + catch (ClusterTopologyException ex) { + ex.retryReadyFuture().get(); + } + catch (IgniteException | CacheException e) { + if (X.hasCause(e, IgniteClientDisconnectedException.class)) { + IgniteClientDisconnectedException cause = X.cause(e, + IgniteClientDisconnectedException.class); + + assert cause != null; + + cause.reconnectFuture().get(); + } + else if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null; + + cause.retryReadyFuture().get(); + } + else + throw e; + } + } + + return null; + } + }, + CLIENT_GRID_CNT); + + try { + // Killing a half of server nodes. + final int srvsToKill = GRID_CNT / 2; + + final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>(); + + for (int i = 0; i < srvsToKill; i++) + victims.add(i); + + final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>(); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assassins.add(i); + + IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.sleep(5_000); + + Ignite assassin = grid(assassins.take()); + + assertFalse(assassin.configuration().isClientMode()); + + Ignite victim = grid(victims.take()); + + assertFalse(victim.configuration().isClientMode()); + + log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']'); + + switch (stopType) { + case CLOSE: + victim.close(); + + break; + + case FAIL_EVENT: + UUID nodeId = victim.cluster().localNode().id(); + + assassin.configuration().getDiscoverySpi().failNode(nodeId, null); + + break; + + case SIMULATE_FAIL: + ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure(); + + break; + + default: + fail(); + } + + return null; + } + }, + assassins.size() + ); + + srvsShutdownFut.get(); + + Thread.sleep(15_000); + + done.set(true); + + clientsFut.get(); + + awaitPartitionMapExchange(); + + for (int k = 0; k < 10_000; k++) { + String key = String.valueOf(k); + + Object val = cache.get(key); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assertEquals(val, ignite(i).cache(null).get(key)); + } + } + finally { + done.set(true); + } + } + + /** + * + */ + enum StopType { + /** */ + CLOSE, + + /** */ + SIMULATE_FAIL, + + /** */ + FAIL_EVENT + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java index c9405fa..689097e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.IgniteClientReconnectMassiveShutdownTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeConcurrentStart; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientReconnectTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheManyClientsTest; @@ -39,6 +40,8 @@ public class IgniteClientNodesTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheClientNodeConcurrentStart.class); suite.addTestSuite(IgniteCacheClientReconnectTest.class); + suite.addTestSuite(IgniteClientReconnectMassiveShutdownTest.class); + return suite; } } \ No newline at end of file
