ignite-3428 Fixed message recovery handling on reconnect (cherry picked from commit 89d722c)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9a508ef Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9a508ef Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9a508ef Branch: refs/heads/ignite-1232 Commit: e9a508ef0d86d5114d709bcaa7666bde7e69efd2 Parents: 2e65beb Author: sboikov <[email protected]> Authored: Wed Jul 6 09:31:07 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 6 10:47:41 2016 +0300 ---------------------------------------------------------------------- .../util/nio/GridNioRecoveryDescriptor.java | 19 ++- .../ignite/internal/util/nio/GridNioServer.java | 47 ++++-- .../util/nio/GridSelectorNioSessionImpl.java | 7 + .../communication/tcp/TcpCommunicationSpi.java | 163 ++++++++++++------- ...gniteCacheMessageRecoveryIdleConnection.java | 154 ++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 6 files changed, 321 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 409bded..35480ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -74,6 +74,9 @@ public class GridNioRecoveryDescriptor { /** Maximum size of unacknowledged messages queue. */ private final int queueLimit; + /** Number of descriptor reservations (for info purposes). */ + private int reserveCnt; + /** * @param queueLimit Maximum size of unacknowledged messages queue. * @param node Node. @@ -256,9 +259,12 @@ public class GridNioRecoveryDescriptor { while (!connected && reserved) wait(); - if (!connected) + if (!connected) { reserved = true; + reserveCnt++; + } + return !connected; } } @@ -375,12 +381,23 @@ public class GridNioRecoveryDescriptor { else { reserved = true; + reserveCnt++; + return true; } } } /** + * @return Number of descriptor reservations. + */ + public int reserveCount() { + synchronized (this) { + return reserveCnt; + } + } + + /** * @param futs Futures to complete. */ private void completeOnNodeLeft(GridNioFuture<?>[] futs) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 9fd5e69..24b8fad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1461,11 +1461,7 @@ public class GridNioServer<T> { sb.append(" Connection info [") .append("rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()) - .append(", msgWriter=").append(writer != null ? writer.toString() : "null") - .append(", msgReader=").append(reader != null ? reader.toString() : "null") - .append(", bytesRcvd=").append(ses.bytesReceived()) - .append(", bytesSent=").append(ses.bytesSent()); + .append(", locAddr=").append(ses.localAddress()); GridNioRecoveryDescriptor desc = ses.recoveryDescriptor(); @@ -1478,6 +1474,28 @@ public class GridNioServer<T> { else sb.append(", recoveryDesc=null"); + sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesSent=").append(ses.bytesSent()) + .append(", opQueueSize=").append(ses.writeQueueSize()) + .append(", msgWriter=").append(writer != null ? writer.toString() : "null") + .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + + int cnt = 0; + + for (GridNioFuture<?> fut : ses.writeQueue()) { + if (cnt == 0) + sb.append(",\n opQueue=[").append(fut); + else + sb.append(',').append(fut); + + if (++cnt == 5) { + sb.append(']'); + + break; + } + } + + sb.append("]").append(U.nl()); } @@ -1803,13 +1821,6 @@ public class GridNioServer<T> { if (e != null) filterChain.onExceptionCaught(ses, e); - try { - filterChain.onSessionClosed(ses); - } - catch (IgniteCheckedException e1) { - filterChain.onExceptionCaught(ses, e1); - } - ses.removeMeta(BUF_META_KEY); // Since ses is in closed state, no write requests will be added. @@ -1837,6 +1848,13 @@ public class GridNioServer<T> { fut.connectionClosed(); } + try { + filterChain.onSessionClosed(ses); + } + catch (IgniteCheckedException e1) { + filterChain.onExceptionCaught(ses, e1); + } + return true; } @@ -2062,24 +2080,29 @@ public class GridNioServer<T> { private SocketChannel sockCh; /** Session to perform operation on. */ + @GridToStringExclude private GridSelectorNioSessionImpl ses; /** Is it a close request or a write request. */ private NioOperation op; /** Message. */ + @GridToStringExclude private ByteBuffer msg; /** Direct message. */ private Message commMsg; /** */ + @GridToStringExclude private boolean accepted; /** */ + @GridToStringExclude private Map<Integer, ?> meta; /** */ + @GridToStringExclude private boolean skipRecovery; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 1241f99..0ba6af2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -264,6 +264,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { return queueSize.get(); } + /** + * @return Write requests. + */ + Collection<GridNioFuture<?>> writeQueue() { + return queue; + } + /** {@inheritDoc} */ @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { assert recoveryDesc != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 875131d..2c03b2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -353,30 +353,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID id = ses.meta(NODE_ID_META); if (id != null) { - GridCommunicationClient rmv = clients.get(id); + if (!stopping) { + boolean reconnect = false; - if (rmv instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient)rmv).session() == ses && - clients.remove(id, rmv)) { - rmv.forceClose(); + GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - if (!stopping) { - GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); + if (recoveryData != null) { + if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (!recoveryData.messagesFutures().isEmpty()) { + reconnect = true; - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - - commWorker.addReconnectRequest(recoveryData); - } + if (log.isDebugEnabled()) + log.debug("Session was closed but there are unacknowledged messages, " + + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); } - else - recoveryData.onNodeLeft(); } + else + recoveryData.onNodeLeft(); } + + DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id, + ses, + recoveryData, + reconnect); + + commWorker.addProcessDisconnectRequest(disconnectData); } CommunicationListener<Message> lsnr0 = lsnr; @@ -1393,6 +1394,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", msgsSent=").append(desc.sent()) .append(", msgsAckedByRmt=").append(desc.acked()) .append(", msgsRcvd=").append(desc.received()) + .append(", lastAcked=").append(desc.lastAcknowledged()) + .append(", reserveCnt=").append(desc.reserveCount()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); } @@ -3040,14 +3043,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter endpoint.close(); } - /** @{@inheritDoc} */ + /** {@inheritDoc} */ @Override protected void cleanup() { super.cleanup(); endpoint.close(); } - /** @{@inheritDoc} */ + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ShmemWorker.class, this); } @@ -3058,7 +3061,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private class CommunicationWorker extends IgniteSpiThread { /** */ - private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>(); + private final BlockingQueue<DisconnectedSessionInfo> q = new LinkedBlockingQueue<>(); /** * @param gridName Grid name. @@ -3073,10 +3076,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Tcp communication worker has been started."); while (!isInterrupted()) { - GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); + DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); - if (recoveryDesc != null) - processRecovery(recoveryDesc); + if (disconnectData != null) + processDisconnect(disconnectData); else processIdle(); } @@ -3181,56 +3184,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * @param recoveryDesc Recovery descriptor. + * @param sesInfo Disconnected session information. */ - private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { - ClusterNode node = recoveryDesc.node(); + private void processDisconnect(DisconnectedSessionInfo sesInfo) { + GridCommunicationClient client = clients.get(sesInfo.nodeId); - try { - if (clients.containsKey(node.id()) || - !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || - !getSpiContext().pingNode(node.id())) - return; - } - catch (IgniteClientDisconnectedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node, client disconnected."); + if (client instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient) client).session() == sesInfo.ses) + clients.remove(sesInfo.nodeId, client); - return; - } + if (sesInfo.reconnect) { + GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; - try { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + ClusterNode node = recoveryDesc.node(); - GridCommunicationClient client = reserveClient(node); + if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; - client.release(); - } - catch (IgniteCheckedException | IgniteException e) { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { + try { if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + + client = reserveClient(node); - addReconnectRequest(recoveryDesc); + client.release(); } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + catch (IgniteCheckedException | IgniteException e) { + try { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + addProcessDisconnectRequest(sesInfo); + } + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); + } + } + catch (IgniteClientDisconnectedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node, client disconnected."); + } } } } /** - * @param recoverySnd Recovery send data. + * @param sesInfo Disconnected session information. */ - void addReconnectRequest(GridNioRecoveryDescriptor recoverySnd) { - boolean add = q.add(recoverySnd); + void addProcessDisconnectRequest(DisconnectedSessionInfo sesInfo) { + boolean add = q.add(sesInfo); assert add; } @@ -3741,4 +3750,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter lock.readUnlock(); } } + + /** + * + */ + private static class DisconnectedSessionInfo { + /** */ + private final UUID nodeId; + + /** */ + private final GridNioSession ses; + + /** */ + private final GridNioRecoveryDescriptor recoveryDesc; + + /** */ + private final boolean reconnect; + + /** + * @param nodeId Node ID. + * @param ses Session. + * @param recoveryDesc Recovery descriptor. + * @param reconnect Reconnect flag. + */ + public DisconnectedSessionInfo(UUID nodeId, + GridNioSession ses, + @Nullable GridNioRecoveryDescriptor recoveryDesc, + boolean reconnect) { + this.nodeId = nodeId; + this.ses = ses; + this.recoveryDesc = recoveryDesc; + this.reconnect = reconnect; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DisconnectedSessionInfo.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java new file mode 100644 index 0000000..618fe2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryIdleConnection.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheMessageRecoveryIdleConnection extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 3; + + /** */ + private static final long IDLE_TIMEOUT = 50; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setIdleConnectionTimeout(IDLE_TIMEOUT); + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 2 * 60_000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperationsIdleConnectionCloseTx() throws Exception { + cacheOperationsIdleConnectionClose(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testCacheOperationsIdleConnectionCloseAtomic() throws Exception { + cacheOperationsIdleConnectionClose(ATOMIC); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void cacheOperationsIdleConnectionClose(CacheAtomicityMode atomicityMode) throws Exception { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(REPLICATED); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + IgniteCache<Object, Object> cache = ignite(0).createCache(ccfg).withAsync(); + + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int iter = 0; + + long stopTime = System.currentTimeMillis() + 90_000; + + while (System.currentTimeMillis() < stopTime) { + if (iter++ % 10 == 0) + log.info("Iteration: " + iter); + + cache.put(iter, 1); + + IgniteFuture<?> fut = cache.future(); + + try { + fut.get(10_000); + } + catch (IgniteException e) { + List<Ignite> nodes = IgnitionEx.allGridsx(); + + for (Ignite node : nodes) + ((IgniteKernal)node).dumpDebugInfo(); + + U.dumpThreads(log); + + throw e; + } + + U.sleep(rnd.nextLong(IDLE_TIMEOUT - 10, IDLE_TIMEOUT + 10)); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e9a508ef/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 1ad74a5..2e45faa 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -125,6 +125,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnection; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest; @@ -278,6 +279,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class); suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class); suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class); + suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnection.class); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);
