ignite-comm-opts2
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8749cfac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8749cfac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8749cfac Branch: refs/heads/ignite-comm-balance Commit: 8749cfacf3d69e8da5557d9142859585428bf42d Parents: 0b8c2bd Author: sboikov <sboi...@gridgain.com> Authored: Fri Sep 16 11:28:48 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Sep 16 11:28:48 2016 +0300 ---------------------------------------------------------------------- .../util/nio/GridNioRecoveryDescriptor.java | 29 +++++++++++++++++++- .../communication/tcp/TcpCommunicationSpi.java | 22 +++++++++------ 2 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8749cfac/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 29903d4..4598eef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -285,7 +285,7 @@ public class GridNioRecoveryDescriptor { /** * */ - public void connected() { + public void onConnected() { synchronized (this) { assert reserved : this; assert !connected : this; @@ -307,6 +307,33 @@ public class GridNioRecoveryDescriptor { } /** + * @return Connected flag. + */ + public boolean connected() { + synchronized (this) { + return connected; + } + } + + /** + * @return Reserved flag. + */ + public boolean reserved() { + synchronized (this) { + return reserved; + } + } + + /** + * @return Current handshake index. + */ + public Long handshakeIndex() { + synchronized (this) { + return handshakeReq != null ? handshakeReq.get1() : null; + } + } + + /** * */ public void release() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8749cfac/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 63afb61..d2c45f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -369,23 +369,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (!stopping) { boolean reconnect = false; - GridNioRecoveryDescriptor recoveryData = ses.outRecoveryDescriptor(); + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { + if (outDesc != null) { + if (outDesc.nodeAlive(getSpiContext().node(id))) { + if (!outDesc.messagesFutures().isEmpty()) { reconnect = true; if (log.isDebugEnabled()) log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + "will try to reconnect [rmtNode=" + outDesc.node().id() + ']'); } } else - recoveryData.onNodeLeft(); + outDesc.onNodeLeft(); } - DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData, + DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(outDesc, reconnect); commWorker.addProcessDisconnectRequest(disconnectData); @@ -649,7 +649,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (sndRes) nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); - recovery.connected(); + recovery.onConnected(); GridTcpNioCommunicationClient client = null; @@ -679,7 +679,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (sndRes) nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); - recovery.connected(); + recovery.onConnected(); } /** @@ -1523,6 +1523,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", msgsRcvd=").append(desc.received()) .append(", lastAcked=").append(desc.lastAcknowledged()) .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) + .append(", handshakeIdx=").append(desc.handshakeIndex()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); } @@ -2959,6 +2962,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * @param recoveryDescs Descriptors map. * @param node Node. * @return Recovery receive data for given node. */