Repository: ignite Updated Branches: refs/heads/master de30a86e7 -> 625e26107
IGNITE-8829 Annotated configuration properties on TcpCommunicationSpi that lack required annotations - Fixes #4583. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/625e2610 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/625e2610 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/625e2610 Branch: refs/heads/master Commit: 625e261078f1df215f51d17a0e84daa847f86bef Parents: de30a86 Author: NSAmelchev <[email protected]> Authored: Tue Sep 4 15:08:38 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Sep 4 15:08:38 2018 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 71 +++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/625e2610/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 77e85c1..4ab1dd4 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -204,16 +204,22 @@ import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastRecei * <h2 class="header">Optional</h2> * The following configuration parameters are optional: * <ul> + * <li>Address resolver (see {@link #setAddressResolver(AddressResolver)}</li> * <li>Node local IP address (see {@link #setLocalAddress(String)})</li> * <li>Node local port number (see {@link #setLocalPort(int)})</li> * <li>Local port range (see {@link #setLocalPortRange(int)}</li> + * <li>Use paired connections (see {@link #setUsePairedConnections(boolean)}</li> * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li> + * <li>Shared memory port (see {@link #setSharedMemoryPort(int)}</li> * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li> * <li>Direct or heap buffer allocation (see {@link #setDirectBuffer(boolean)})</li> * <li>Direct or heap buffer allocation for sending (see {@link #setDirectSendBuffer(boolean)})</li> * <li>Count of selectors and selector threads for NIO server (see {@link #setSelectorsCount(int)})</li> + * <li>Selector thread busy-loop iterations (see {@link #setSelectorSpins(long)}</li> * <li>{@code TCP_NODELAY} socket option for sockets (see {@link #setTcpNoDelay(boolean)})</li> + * <li>Filter reachable addresses (see {@link #setFilterReachableAddresses(boolean)} </li> * <li>Message queue limit (see {@link #setMessageQueueLimit(int)})</li> + * <li>Slow client queue limit (see {@link #setSlowClientQueueLimit(int)})</li> * <li>Connect timeout (see {@link #setConnectTimeout(long)})</li> * <li>Maximum connect timeout (see {@link #setMaxConnectTimeout(long)})</li> * <li>Reconnect attempts count (see {@link #setReconnectCount(int)})</li> @@ -382,7 +388,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati private final GridNioServerListener<Message> srvLsnr = new GridNioServerListenerAdapter<Message>() { @Override public void onSessionWriteTimeout(GridNioSession ses) { - LT.warn(log,"Communication SPI session write timed out (consider increasing " + + LT.warn(log, "Communication SPI session write timed out (consider increasing " + "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + sockWriteTimeout + ']'); @@ -499,7 +505,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati boolean unknownNode = true; if (discoverySpi instanceof TcpDiscoverySpi) { - TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; + TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi)discoverySpi; ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); @@ -511,7 +517,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else if (discoverySpi instanceof IgniteDiscoverySpi) - unknownNode = !((IgniteDiscoverySpi) discoverySpi).knownNode(sndId); + unknownNode = !((IgniteDiscoverySpi)discoverySpi).knownNode(sndId); if (unknownNode) { U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']'); @@ -636,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + - "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ", recovery=" + recoveryDesc + ']'); if (reserved) { @@ -655,9 +661,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { if (log.isInfoEnabled()) { log.info("Received incoming connection from remote node while " + - "connecting to this node, rejecting [locNode=" + locNode.id() + - ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + - ", rmtNodeOrder=" + rmtNode.order() + ']'); + "connecting to this node, rejecting [locNode=" + locNode.id() + + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + + ", rmtNodeOrder=" + rmtNode.order() + ']'); } ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); @@ -928,7 +934,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); recoveryDesc.release(); } @@ -1010,14 +1016,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); fut.onDone(client); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); recoveryDesc.release(); @@ -1210,6 +1216,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * See {@link #setAddressResolver(AddressResolver)}. + * + * @return Address resolver. + */ + public AddressResolver getAddressResolver() { + return addrRslvr; + } + + /** * Injects resources. * * @param ignite Ignite. @@ -1330,9 +1345,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * Default is {@code false}. * * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise. - * @see #getConnectionsPerNode() * @return {@code this} for chaining. + * @see #getConnectionsPerNode() */ + @IgniteSpiConfiguration(optional = true) public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) { this.usePairedConnections = usePairedConnections; @@ -1345,9 +1361,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * half for outgoing messages. * * @param maxConnectionsPerNode Number of connections per node. - * @see #isUsePairedConnections() * @return {@code this} for chaining. + * @see #isUsePairedConnections() */ + @IgniteSpiConfiguration(optional = true) public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) { this.connectionsPerNode = maxConnectionsPerNode; @@ -1357,7 +1374,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * See {@link #setConnectionsPerNode(int)}. * - * @return Number of connections per node. + * @return Number of connections per node. */ public int getConnectionsPerNode() { return connectionsPerNode; @@ -1513,7 +1530,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * See {@link #setConnectTimeout(long)}. * * @return Connect timeout. - */public long getConnectTimeout() { + */ + public long getConnectTimeout() { return connTimeout; } @@ -1671,6 +1689,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param selectorSpins Selector thread busy-loop iterations. * @return {@code this} for chaining. */ + @IgniteSpiConfiguration(optional = true) public TcpCommunicationSpi setSelectorSpins(long selectorSpins) { this.selectorSpins = selectorSpins; @@ -1832,6 +1851,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param slowClientQueueLimit Slow client queue limit. * @return {@code this} for chaining. */ + @IgniteSpiConfiguration(optional = true) public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) { this.slowClientQueueLimit = slowClientQueueLimit; @@ -2223,7 +2243,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati log.debug(startInfo()); } - /** {@inheritDoc} }*/ + /** {@inheritDoc} } */ @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { spiCtx.registerPort(boundTcpPort, IgnitePortProtocol.TCP); @@ -2331,12 +2351,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati IgniteBiInClosure<GridNioSession, Integer> queueSizeMonitor = !clientMode && slowClientQueueLimit > 0 ? - new CI2<GridNioSession, Integer>() { - @Override public void apply(GridNioSession ses, Integer qSize) { - checkClientQueueSize(ses, qSize); - } - } : - null; + new CI2<GridNioSession, Integer>() { + @Override public void apply(GridNioSession ses, Integer qSize) { + checkClientQueueSize(ses, qSize); + } + } : + null; GridNioFilter[] filters; @@ -3262,7 +3282,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati GridNioSession ses = recoveryDesc.session(); if (ses != null) { - while(ses.closeTime() == 0) + while (ses.closeTime() == 0) ses.close(); } @@ -3485,7 +3505,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param node Remote node. * @param addrs Remote node addresses. * @param errs TCP client creation errors. - * * @throws IgniteCheckedException If failed. */ protected void processClientCreationError( @@ -3592,8 +3611,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param timeout Timeout for handshake. * @param sslMeta Session meta. * @param handshakeConnIdx Non null connection index if need send it in handshake. - * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. * @return Handshake response. + * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. */ @SuppressWarnings("ThrowFromFinallyBlock") private long safeTcpHandshake( @@ -4113,7 +4132,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** {@inheritDoc} */ @Override public void onEvent(Event evt) { assert evt instanceof DiscoveryEvent : evt; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); } @@ -4194,7 +4213,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati srvLsnr, writerFactory, new GridNioCodecFilter( - new GridDirectParser(log.getLogger(GridDirectParser.class),msgFactory, readerFactory), + new GridDirectParser(log.getLogger(GridDirectParser.class), msgFactory, readerFactory), log, true), new GridConnectionBytesVerifyFilter(log)
