http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 42879b7..f13f1f2 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -106,7 +106,6 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; @@ -122,6 +121,7 @@ import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -241,8 +241,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = false) -public class TcpCommunicationSpi extends IgniteSpiAdapter - implements CommunicationSpi<Message>, TcpCommunicationSpiMBean { +public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message> { /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower)."; @@ -1100,12 +1099,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Sets address resolver. * * @param addrRslvr Address resolver. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setAddressResolver(AddressResolver addrRslvr) { + public TcpCommunicationSpi setAddressResolver(AddressResolver addrRslvr) { // Injection should not override value already set by Spring or user. if (this.addrRslvr == null) this.addrRslvr = addrRslvr; + + return this; } /** @@ -1130,16 +1132,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * @param locAddr IP address. Default value is any available local * IP address. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { + public TcpCommunicationSpi setLocalAddress(String locAddr) { // Injection should not override value already set by Spring or user. if (this.locAddr == null) this.locAddr = locAddr; + + return this; } - /** {@inheritDoc} */ - @Override public String getLocalAddress() { + /** + * See {@link #setLocalAddress(String)}. + * + * @return Grid node IP address. + */ + public String getLocalAddress() { return locAddr; } @@ -1149,14 +1158,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_PORT}. * * @param locPort Port number. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPort(int locPort) { + public TcpCommunicationSpi setLocalPort(int locPort) { this.locPort = locPort; + + return this; } - /** {@inheritDoc} */ - @Override public int getLocalPort() { + /** + * See {@link #setLocalPort(int)}. + * + * @return Port number. + */ + public int getLocalPort() { return locPort; } @@ -1175,19 +1191,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_PORT_RANGE}. * * @param locPortRange New local port range. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setLocalPortRange(int locPortRange) { + public TcpCommunicationSpi setLocalPortRange(int locPortRange) { this.locPortRange = locPortRange; + + return this; } - /** {@inheritDoc} */ - @Override public int getLocalPortRange() { + /** + * See {@link #setLocalPortRange(int)}. + * + * @return Local Port range. + */ + public int getLocalPortRange() { return locPortRange; } - /** {@inheritDoc} */ - @Override public boolean isUsePairedConnections() { + /** + * See {@link #setUsePairedConnections(boolean)}. + * + * @return {@code true} to use paired connections and {@code false} otherwise. + */ + public boolean isUsePairedConnections() { return usePairedConnections; } @@ -1205,9 +1232,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise. * @see #getConnectionsPerNode() + * @return {@code this} for chaining. */ - public void setUsePairedConnections(boolean usePairedConnections) { + public TcpCommunicationSpi setUsePairedConnections(boolean usePairedConnections) { this.usePairedConnections = usePairedConnections; + + return this; } /** @@ -1217,13 +1247,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * @param maxConnectionsPerNode Number of connections per node. * @see #isUsePairedConnections() + * @return {@code this} for chaining. */ - public void setConnectionsPerNode(int maxConnectionsPerNode) { + public TcpCommunicationSpi setConnectionsPerNode(int maxConnectionsPerNode) { this.connectionsPerNode = maxConnectionsPerNode; + + return this; } - /** {@inheritDoc} */ - @Override public int getConnectionsPerNode() { + /** + * See {@link #setConnectionsPerNode(int)}. + * + * @return Number of connections per node. + */ + public int getConnectionsPerNode() { return connectionsPerNode; } @@ -1235,14 +1272,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_SHMEM_PORT}. * * @param shmemPort Port number. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setSharedMemoryPort(int shmemPort) { + public TcpCommunicationSpi setSharedMemoryPort(int shmemPort) { this.shmemPort = shmemPort; + + return this; } - /** {@inheritDoc} */ - @Override public int getSharedMemoryPort() { + /** + * See {@link #setSharedMemoryPort(int)}. + * + * @return Port number. + */ + public int getSharedMemoryPort() { return shmemPort; } @@ -1253,19 +1297,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_IDLE_CONN_TIMEOUT}. * * @param idleConnTimeout Maximum idle connection time. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setIdleConnectionTimeout(long idleConnTimeout) { + public TcpCommunicationSpi setIdleConnectionTimeout(long idleConnTimeout) { this.idleConnTimeout = idleConnTimeout; + + return this; } - /** {@inheritDoc} */ - @Override public long getIdleConnectionTimeout() { + /** + * See {@link #setIdleConnectionTimeout(long)}. + * + * @return Maximum idle connection time. + */ + public long getIdleConnectionTimeout() { return idleConnTimeout; } - /** {@inheritDoc} */ - @Override public long getSocketWriteTimeout() { + /** + * See {@link #setSocketWriteTimeout(long)}. + * + * @return Socket write timeout for TCP connections. + */ + public long getSocketWriteTimeout() { return sockWriteTimeout; } @@ -1276,14 +1331,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Default to {@link #DFLT_SOCK_WRITE_TIMEOUT}. * * @param sockWriteTimeout Socket write timeout for TCP connection. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setSocketWriteTimeout(long sockWriteTimeout) { + public TcpCommunicationSpi setSocketWriteTimeout(long sockWriteTimeout) { this.sockWriteTimeout = sockWriteTimeout; + + return this; } - /** {@inheritDoc} */ - @Override public int getAckSendThreshold() { + /** + * See {@link #setAckSendThreshold(int)}. + * + * @return Number of received messages after which acknowledgment is sent. + */ + public int getAckSendThreshold() { return ackSndThreshold; } @@ -1293,14 +1355,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Default to {@link #DFLT_ACK_SND_THRESHOLD}. * * @param ackSndThreshold Number of received messages after which acknowledgment is sent. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setAckSendThreshold(int ackSndThreshold) { + public TcpCommunicationSpi setAckSendThreshold(int ackSndThreshold) { this.ackSndThreshold = ackSndThreshold; + + return this; } - /** {@inheritDoc} */ - @Override public int getUnacknowledgedMessagesBufferSize() { + /** + * See {@link #setUnacknowledgedMessagesBufferSize(int)}. + * + * @return Maximum number of unacknowledged messages. + */ + public int getUnacknowledgedMessagesBufferSize() { return unackedMsgsBufSize; } @@ -1310,17 +1379,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * closed and reconnect is attempted. * * @param unackedMsgsBufSize Maximum number of unacknowledged messages. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) { + public TcpCommunicationSpi setUnacknowledgedMessagesBufferSize(int unackedMsgsBufSize) { this.unackedMsgsBufSize = unackedMsgsBufSize; + + return this; } /** * Sets connection buffer size. If set to {@code 0} connection buffer is disabled. * * @param connBufSize Connection buffer size. - * @see #setConnectionBufferFlushFrequency(long) * @deprecated Not used any more. */ @Deprecated @@ -1329,22 +1400,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // No-op. } - /** {@inheritDoc} */ + /** + * Gets connection buffer size. + * <p> + * If set to {@code 0} connection buffer is disabled. + * + * @return Connection buffer size. + * @deprecated Not used anymore. + */ @Deprecated - @Override public int getConnectionBufferSize() { + public int getConnectionBufferSize() { return 0; } - /** {@inheritDoc} */ + /** + * Sets connection buffer flush frequency. + * <p> + * Client connections to other nodes in topology use buffered output. + * This frequency defines how often system will advice to flush + * connection buffer. + * <p> + * This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}. + * + * @param connBufFlushFreq Flush frequency. + * @see #getConnectionBufferSize() + * @deprecated Not used anymore. + */ @Deprecated @IgniteSpiConfiguration(optional = true) - @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { + public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { // No-op. } - /** {@inheritDoc} */ + /** + * Gets connection buffer size. + * <p> + * If set to {@code 0} connection buffer is disabled. + * + * @return Connection buffer size. + * @deprecated Not used anymore. + */ @Deprecated - @Override public long getConnectionBufferFlushFrequency() { + public long getConnectionBufferFlushFrequency() { return 0; } @@ -1359,16 +1456,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param connTimeout Connect timeout. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setConnectTimeout(long connTimeout) { + public TcpCommunicationSpi setConnectTimeout(long connTimeout) { this.connTimeout = connTimeout; failureDetectionTimeoutEnabled(false); + + return this; } - /** {@inheritDoc} */ - @Override public long getConnectTimeout() { + /** + * See {@link #setConnectTimeout(long)}. + * + * @return Connect timeout. + */public long getConnectTimeout() { return connTimeout; } @@ -1385,16 +1488,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param maxConnTimeout Maximum connect timeout. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMaxConnectTimeout(long maxConnTimeout) { + public TcpCommunicationSpi setMaxConnectTimeout(long maxConnTimeout) { this.maxConnTimeout = maxConnTimeout; failureDetectionTimeoutEnabled(false); + + return this; } - /** {@inheritDoc} */ - @Override public long getMaxConnectTimeout() { + /** + * Gets maximum connect timeout. + * + * @return Maximum connect timeout. + */ + public long getMaxConnectTimeout() { return maxConnTimeout; } @@ -1407,16 +1517,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored. * * @param reconCnt Maximum number of reconnection attempts. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setReconnectCount(int reconCnt) { + public TcpCommunicationSpi setReconnectCount(int reconCnt) { this.reconCnt = reconCnt; failureDetectionTimeoutEnabled(false); + + return this; } - /** {@inheritDoc} */ - @Override public int getReconnectCount() { + /** + * Gets maximum number of reconnect attempts used when establishing connection + * with remote nodes. + * + * @return Reconnects count. + */ + public int getReconnectCount() { return reconCnt; } @@ -1428,32 +1546,46 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@code true}. * * @param directBuf Flag indicates to allocate direct or heap buffer in SPI. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setDirectBuffer(boolean directBuf) { + public TcpCommunicationSpi setDirectBuffer(boolean directBuf) { this.directBuf = directBuf; + + return this; } - /** {@inheritDoc} */ - @Override public boolean isDirectBuffer() { + /** + * Gets flag that indicates whether direct or heap allocated buffer is used. + * + * @return Flag that indicates whether direct or heap allocated buffer is used. + */ + public boolean isDirectBuffer() { return directBuf; } - /** {@inheritDoc} */ - @Override public boolean isDirectSendBuffer() { + /** + * Gets flag defining whether direct send buffer should be used. + * + * @return {@code True} if direct buffers should be used. + */ + public boolean isDirectSendBuffer() { return directSndBuf; } /** * Sets whether to use direct buffer for sending. - * <p> + * * If not provided default is {@code false}. * * @param directSndBuf {@code True} to use direct buffers for send. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setDirectSendBuffer(boolean directSndBuf) { + public TcpCommunicationSpi setDirectSendBuffer(boolean directSndBuf) { this.directSndBuf = directSndBuf; + + return this; } /** @@ -1462,19 +1594,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_SELECTORS_CNT}. * * @param selectorsCnt Selectors count. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setSelectorsCount(int selectorsCnt) { + public TcpCommunicationSpi setSelectorsCount(int selectorsCnt) { this.selectorsCnt = selectorsCnt; + + return this; } - /** {@inheritDoc} */ - @Override public int getSelectorsCount() { + /** + * See {@link #setSelectorsCount(int)}. + * + * @return Count of selectors in TCP server. + */ + public int getSelectorsCount() { return selectorsCnt; } - /** {@inheritDoc} */ - @Override public long getSelectorSpins() { + /** + * See {@link #setSelectorSpins(long)}. + * + * @return Selector thread busy-loop iterations. + */ + public long getSelectorSpins() { return selectorSpins; } @@ -1484,9 +1627,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. * * @param selectorSpins Selector thread busy-loop iterations. + * @return {@code this} for chaining. */ - public void setSelectorSpins(long selectorSpins) { + public TcpCommunicationSpi setSelectorSpins(long selectorSpins) { this.selectorSpins = selectorSpins; + + return this; } /** @@ -1502,14 +1648,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default value is {@link #DFLT_TCP_NODELAY}. * * @param tcpNoDelay {@code True} to disable TCP delay. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setTcpNoDelay(boolean tcpNoDelay) { + public TcpCommunicationSpi setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; + + return this; } - /** {@inheritDoc} */ - @Override public boolean isTcpNoDelay() { + /** + * Gets value for {@code TCP_NODELAY} socket option. + * + * @return {@code True} if TCP delay is disabled. + */ + public boolean isTcpNoDelay() { return tcpNoDelay; } @@ -1519,14 +1672,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. * * @param sockRcvBuf Socket receive buffer size. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setSocketReceiveBuffer(int sockRcvBuf) { + public TcpCommunicationSpi setSocketReceiveBuffer(int sockRcvBuf) { this.sockRcvBuf = sockRcvBuf; + + return this; } - /** {@inheritDoc} */ - @Override public int getSocketReceiveBuffer() { + /** + * See {@link #setSocketReceiveBuffer(int)}. + * + * @return Socket receive buffer size. + */ + public int getSocketReceiveBuffer() { return sockRcvBuf; } @@ -1536,14 +1696,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. * * @param sockSndBuf Socket send buffer size. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setSocketSendBuffer(int sockSndBuf) { + public TcpCommunicationSpi setSocketSendBuffer(int sockSndBuf) { this.sockSndBuf = sockSndBuf; + + return this; } - /** {@inheritDoc} */ - @Override public int getSocketSendBuffer() { + /** + * See {@link #setSocketSendBuffer(int)}. + * + * @return Socket send buffer size. + */ + public int getSocketSendBuffer() { return sockSndBuf; } @@ -1556,19 +1723,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}. * * @param msgQueueLimit Send queue size limit. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMessageQueueLimit(int msgQueueLimit) { + public TcpCommunicationSpi setMessageQueueLimit(int msgQueueLimit) { this.msgQueueLimit = msgQueueLimit; + + return this; } - /** {@inheritDoc} */ - @Override public int getMessageQueueLimit() { + /** + * Gets message queue limit for incoming and outgoing messages. + * + * @return Send queue size limit. + */ + public int getMessageQueueLimit() { return msgQueueLimit; } - /** {@inheritDoc} */ - @Override public int getSlowClientQueueLimit() { + /** + * See {@link #setSlowClientQueueLimit(int)}. + * + * @return Slow client queue limit. + */ + public int getSlowClientQueueLimit() { return slowClientQueueLimit; } @@ -1583,9 +1761,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * which means {@code unlimited}. * * @param slowClientQueueLimit Slow client queue limit. + * @return {@code this} for chaining. */ - public void setSlowClientQueueLimit(int slowClientQueueLimit) { + public TcpCommunicationSpi setSlowClientQueueLimit(int slowClientQueueLimit) { this.slowClientQueueLimit = slowClientQueueLimit; + + return this; } /** @@ -1601,9 +1782,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // No-op. } - /** {@inheritDoc} */ + /** + * Gets the minimum number of messages for this SPI, that are buffered + * prior to sending. + * + * @return Minimum buffered message count. + * @deprecated Not used anymore. + */ @Deprecated - @Override public int getMinimumBufferedMessageCount() { + public int getMinimumBufferedMessageCount() { return 0; } @@ -1656,8 +1843,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rcvdBytesCnt.add(-rcvdBytesCnt.sum()); } - /** {@inheritDoc} */ - @Override public void dumpStats() { + /** + * Dumps SPI per-connection stats to logs. + */ + public void dumpStats() { IgniteLogger log = this.log; if (log != null) { @@ -1879,7 +2068,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " + "due to message queues growth on sender and receiver sides."); - registerMBean(igniteInstanceName, this, TcpCommunicationSpiMBean.class); + registerMBean(igniteInstanceName, new TcpCommunicationSpiMBeanImpl(this), TcpCommunicationSpiMBean.class); connectGate = new ConnectGateway(); @@ -3424,6 +3613,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** {@inheritDoc} */ + @Override public TcpCommunicationSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); } @@ -4562,4 +4758,178 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ int connectionIndex(); } + + /** + * MBean implementation for TcpCommunicationSpi. + */ + private class TcpCommunicationSpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpCommunicationSpiMBean { + /** {@inheritDoc} */ + TcpCommunicationSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public String getLocalAddress() { + return TcpCommunicationSpi.this.getLocalAddress(); + } + + /** {@inheritDoc} */ + @Override public int getLocalPort() { + return TcpCommunicationSpi.this.getLocalPort(); + } + + /** {@inheritDoc} */ + @Override public int getLocalPortRange() { + return TcpCommunicationSpi.this.getLocalPortRange(); + } + + /** {@inheritDoc} */ + @Override public boolean isUsePairedConnections() { + return TcpCommunicationSpi.this.isUsePairedConnections(); + } + + /** {@inheritDoc} */ + @Override public int getConnectionsPerNode() { + return TcpCommunicationSpi.this.getConnectionsPerNode(); + } + + /** {@inheritDoc} */ + @Override public int getSharedMemoryPort() { + return TcpCommunicationSpi.this.getSharedMemoryPort(); + } + + /** {@inheritDoc} */ + @Override public long getIdleConnectionTimeout() { + return TcpCommunicationSpi.this.getIdleConnectionTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getSocketWriteTimeout() { + return TcpCommunicationSpi.this.getSocketWriteTimeout(); + } + + /** {@inheritDoc} */ + @Override public int getAckSendThreshold() { + return TcpCommunicationSpi.this.getAckSendThreshold(); + } + + /** {@inheritDoc} */ + @Override public int getUnacknowledgedMessagesBufferSize() { + return TcpCommunicationSpi.this.getUnacknowledgedMessagesBufferSize(); + } + + /** {@inheritDoc} */ + @Override public long getConnectTimeout() { + return TcpCommunicationSpi.this.getConnectTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getMaxConnectTimeout() { + return TcpCommunicationSpi.this.getMaxConnectTimeout(); + } + + /** {@inheritDoc} */ + @Override public int getReconnectCount() { + return TcpCommunicationSpi.this.getReconnectCount(); + } + + /** {@inheritDoc} */ + @Deprecated + @Override public int getConnectionBufferSize() { + return TcpCommunicationSpi.this.getConnectionBufferSize(); + } + + /** {@inheritDoc} */ + @Deprecated + @Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) { + TcpCommunicationSpi.this.setConnectionBufferFlushFrequency(connBufFlushFreq); + } + + /** {@inheritDoc} */ + @Deprecated + @Override public long getConnectionBufferFlushFrequency() { + return TcpCommunicationSpi.this.getConnectionBufferFlushFrequency(); + } + + /** {@inheritDoc} */ + @Override public boolean isDirectBuffer() { + return TcpCommunicationSpi.this.isDirectBuffer(); + } + + /** {@inheritDoc} */ + @Override public boolean isDirectSendBuffer() { + return TcpCommunicationSpi.this.isDirectSendBuffer(); + } + + /** {@inheritDoc} */ + @Override public int getSelectorsCount() { + return TcpCommunicationSpi.this.getSelectorsCount(); + } + + /** {@inheritDoc} */ + @Override public long getSelectorSpins() { + return TcpCommunicationSpi.this.getSelectorSpins(); + } + + /** {@inheritDoc} */ + @Override public boolean isTcpNoDelay() { + return TcpCommunicationSpi.this.isTcpNoDelay(); + } + + /** {@inheritDoc} */ + @Override public int getSocketReceiveBuffer() { + return TcpCommunicationSpi.this.getSocketReceiveBuffer(); + } + + /** {@inheritDoc} */ + @Override public int getSocketSendBuffer() { + return TcpCommunicationSpi.this.getSocketSendBuffer(); + } + + /** {@inheritDoc} */ + @Override public int getMessageQueueLimit() { + return TcpCommunicationSpi.this.getMessageQueueLimit(); + } + + /** {@inheritDoc} */ + @Override public int getSlowClientQueueLimit() { + return TcpCommunicationSpi.this.getSlowClientQueueLimit(); + } + + /** {@inheritDoc} */ + @Deprecated + @Override public int getMinimumBufferedMessageCount() { + return TcpCommunicationSpi.this.getMinimumBufferedMessageCount(); + } + + /** {@inheritDoc} */ + @Override public void dumpStats() { + TcpCommunicationSpi.this.dumpStats(); + } + + /** {@inheritDoc} */ + @Override public int getSentMessagesCount() { + return TcpCommunicationSpi.this.getSentMessagesCount(); + } + + /** {@inheritDoc} */ + @Override public long getSentBytesCount() { + return TcpCommunicationSpi.this.getSentBytesCount(); + } + + /** {@inheritDoc} */ + @Override public int getReceivedMessagesCount() { + return TcpCommunicationSpi.this.getReceivedMessagesCount(); + } + + /** {@inheritDoc} */ + @Override public long getReceivedBytesCount() { + return TcpCommunicationSpi.this.getReceivedBytesCount(); + } + + /** {@inheritDoc} */ + @Override public int getOutboundMessagesQueueSize() { + return TcpCommunicationSpi.this.getOutboundMessagesQueueSize(); + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java index 66b715a..9d46737 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/deployment/local/LocalDeploymentSpi.java @@ -35,6 +35,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.deployment.DeploymentListener; import org.apache.ignite.spi.deployment.DeploymentResource; @@ -66,7 +67,7 @@ import org.jsr166.ConcurrentLinkedHashMap; @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = false) @IgnoreIfPeerClassLoadingDisabled -public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSpi, LocalDeploymentSpiMBean { +public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSpi { /** */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @LoggerResource @@ -76,7 +77,7 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp private ConcurrentLinkedHashMap<ClassLoader, ConcurrentMap<String, String>> ldrRsrcs = new ConcurrentLinkedHashMap<>(16, 0.75f, 64); - /** Deployment SPI listener. */ + /** Deployment SPI listener. */ private volatile DeploymentListener lsnr; /** {@inheritDoc} */ @@ -84,7 +85,7 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp // Start SPI start stopwatch. startStopwatch(); - registerMBean(igniteInstanceName, this, LocalDeploymentSpiMBean.class); + registerMBean(igniteInstanceName, new LocalDeploymentSpiMBeanImpl(this), LocalDeploymentSpiMBean.class); if (log.isDebugEnabled()) log.debug(startInfo()); @@ -395,7 +396,24 @@ public class LocalDeploymentSpi extends IgniteSpiAdapter implements DeploymentSp } /** {@inheritDoc} */ + @Override public LocalDeploymentSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(LocalDeploymentSpi.class, this); } + + /** + * MBean implementation for LocalDeploymentSpi. + */ + private class LocalDeploymentSpiMBeanImpl extends IgniteSpiMBeanAdapter implements LocalDeploymentSpiMBean { + /** {@inheritDoc} */ + LocalDeploymentSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index e8b937a..19244dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -70,6 +70,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; @@ -217,7 +218,7 @@ import org.jetbrains.annotations.Nullable; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean { +public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -403,18 +404,30 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** */ protected IgniteSpiContext spiCtx; - /** {@inheritDoc} */ - @Override public String getSpiState() { + /** + * Gets current SPI state. + * + * @return Current SPI state. + */ + public String getSpiState() { return impl.getSpiState(); } - /** {@inheritDoc} */ - @Override public int getMessageWorkerQueueSize() { + /** + * Gets message worker queue current size. + * + * @return Message worker queue current size. + */ + public int getMessageWorkerQueueSize() { return impl.getMessageWorkerQueueSize(); } - /** {@inheritDoc} */ - @Nullable @Override public UUID getCoordinator() { + /** + * Gets current coordinator. + * + * @return Gets current coordinator. + */ + public UUID getCoordinator() { return impl.getCoordinator(); } @@ -453,8 +466,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T impl.failNode(nodeId, warning); } - /** {@inheritDoc} */ - @Override public void dumpDebugInfo() { + /** + * Dumps debug info using configured logger. + */ + public void dumpDebugInfo() { impl.dumpDebugInfo(log); } @@ -580,8 +595,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return addrRslvr; } - /** {@inheritDoc} */ - @Override public int getReconnectCount() { + /** + * Gets number of connection attempts. + * + * @return Number of connection attempts. + */ + public int getReconnectCount() { return reconCnt; } @@ -608,8 +627,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public long getMaxAckTimeout() { + /** + * Gets maximum message acknowledgement timeout. + * + * @return Maximum message acknowledgement timeout. + */ + public long getMaxAckTimeout() { return maxAckTimeout; } @@ -639,8 +662,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public int getLocalPort() { + /** + * Gets local TCP port SPI listens to. + * + * @return Local port range. + */ + public int getLocalPort() { TcpDiscoveryNode locNode0 = locNode; return locNode0 != null ? locNode0.discoveryPort() : 0; @@ -663,8 +690,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public int getLocalPortRange() { + /** + * Gets local TCP port range. + * + * @return Local port range. + */ + public int getLocalPortRange() { return locPortRange; } @@ -689,8 +720,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public int getMaxMissedHeartbeats() { + /** + * Gets max heartbeats count node can miss without initiating status check. + * + * @return Max missed heartbeats. + */ + public int getMaxMissedHeartbeats() { return maxMissedHbs; } @@ -711,8 +746,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public int getMaxMissedClientHeartbeats() { + /** + * Gets max heartbeats count node can miss without failing client node. + * + * @return Max missed client heartbeats. + */ + public int getMaxMissedClientHeartbeats() { return maxMissedClientHbs; } @@ -731,8 +770,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public long getStatisticsPrintFrequency() { + /** + * Gets statistics print frequency. + * + * @return Statistics print frequency in milliseconds. + */ + public long getStatisticsPrintFrequency() { return statsPrintFreq; } @@ -755,8 +798,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public long getIpFinderCleanFrequency() { + /** + * Gets IP finder clean frequency. + * + * @return IP finder clean frequency. + */ + public long getIpFinderCleanFrequency() { return ipFinderCleanFreq; } @@ -862,8 +909,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return this; } - /** {@inheritDoc} */ - @Override public long getJoinTimeout() { + /** + * Gets join timeout. + * + * @return Join timeout. + */ + public long getJoinTimeout() { return joinTimeout; } @@ -964,6 +1015,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** + * Gets ID of the local node. + * + * @return ID of the local node. + */ + public UUID getLocalNodeId() { + return ignite.cluster().localNode().id(); + } + + /** * @param srvPort Server port. * @param addExtAddrAttr If {@code true} adds {@link #ATTR_EXT_ADDRS} attribute. */ @@ -1061,93 +1121,164 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return F.<Object>asList(ipFinder); } - /** {@inheritDoc} */ - @Override public long getSocketTimeout() { + /** + * Gets socket timeout. + * + * @return Socket timeout. + */ + public long getSocketTimeout() { return sockTimeout; } - /** {@inheritDoc} */ - @Override public long getAckTimeout() { + /** + * Gets message acknowledgement timeout. + * + * @return Message acknowledgement timeout. + */ + public long getAckTimeout() { return ackTimeout; } - /** {@inheritDoc} */ - @Override public long getNetworkTimeout() { + /** + * Gets network timeout. + * + * @return Network timeout. + */ + public long getNetworkTimeout() { return netTimeout; } - /** {@inheritDoc} */ - @Override public int getThreadPriority() { + /** + * Gets thread priority. All threads within SPI will be started with it. + * + * @return Thread priority. + */ + public int getThreadPriority() { return threadPri; } - /** {@inheritDoc} */ - @Override public long getHeartbeatFrequency() { + /** + * Gets delay between heartbeat messages sent by coordinator. + * + * @return Time period in milliseconds. + */ + public long getHeartbeatFrequency() { return hbFreq; } - /** {@inheritDoc} */ - @Override public String getIpFinderFormatted() { + /** + * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). + * + * @return IPFinder (string representation). + */public String getIpFinderFormatted() { return ipFinder.toString(); } - /** {@inheritDoc} */ - @Override public long getNodesJoined() { + /** + * Gets joined nodes count. + * + * @return Nodes joined count. + */ + public long getNodesJoined() { return stats.joinedNodesCount(); } - /** {@inheritDoc} */ - @Override public long getNodesLeft() { + /** + * Gets left nodes count. + * + * @return Left nodes count. + */ + public long getNodesLeft() { return stats.leftNodesCount(); } - /** {@inheritDoc} */ - @Override public long getNodesFailed() { + /** + * Gets failed nodes count. + * + * @return Failed nodes count. + */ + public long getNodesFailed() { return stats.failedNodesCount(); } - /** {@inheritDoc} */ - @Override public long getPendingMessagesRegistered() { + /** + * Gets pending messages registered count. + * + * @return Pending messages registered count. + */ + public long getPendingMessagesRegistered() { return stats.pendingMessagesRegistered(); } - /** {@inheritDoc} */ - @Override public long getPendingMessagesDiscarded() { + /** + * Gets pending messages discarded count. + * + * @return Pending messages registered count. + */ + public long getPendingMessagesDiscarded() { return stats.pendingMessagesDiscarded(); } - /** {@inheritDoc} */ - @Override public long getAvgMessageProcessingTime() { + /** + * Gets avg message processing time. + * + * @return Avg message processing time. + */ + public long getAvgMessageProcessingTime() { return stats.avgMessageProcessingTime(); } - /** {@inheritDoc} */ - @Override public long getMaxMessageProcessingTime() { + /** + * Gets max message processing time. + * + * @return Max message processing time. + */ + public long getMaxMessageProcessingTime() { return stats.maxMessageProcessingTime(); } - /** {@inheritDoc} */ - @Override public int getTotalReceivedMessages() { + /** + * Gets total received messages count. + * + * @return Total received messages count. + */ + public int getTotalReceivedMessages() { return stats.totalReceivedMessages(); } - /** {@inheritDoc} */ - @Override public Map<String, Integer> getReceivedMessages() { + /** + * Gets received messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public Map<String, Integer> getReceivedMessages() { return stats.receivedMessages(); } - /** {@inheritDoc} */ - @Override public int getTotalProcessedMessages() { + /** + * Gets total processed messages count. + * + * @return Total processed messages count. + */ + public int getTotalProcessedMessages() { return stats.totalProcessedMessages(); } - /** {@inheritDoc} */ - @Override public Map<String, Integer> getProcessedMessages() { + /** + * Gets processed messages counts (grouped by type). + * + * @return Map containing message types and respective counts. + */ + public Map<String, Integer> getProcessedMessages() { return stats.processedMessages(); } - /** {@inheritDoc} */ - @Override public long getCoordinatorSinceTimestamp() { + /** + * Gets time local node has been coordinator since. + * + * @return Time local node is coordinator since. + */ + public long getCoordinatorSinceTimestamp() { return stats.coordinatorSinceTimestamp(); } @@ -1815,7 +1946,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T if (netTimeout < 3000) U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout); - registerMBean(igniteInstanceName, this, TcpDiscoverySpiMBean.class); + registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this), TcpDiscoverySpiMBean.class); if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) { TcpDiscoveryMulticastIpFinder mcastIpFinder = ((TcpDiscoveryMulticastIpFinder)ipFinder); @@ -2000,6 +2131,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** {@inheritDoc} */ + @Override public TcpDiscoverySpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoverySpi.class, this); } @@ -2070,4 +2208,175 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return S.toString(SocketTimeoutObject.class, this); } } + + /** + * MBean implementation for TcpDiscoverySpiMBean. + */ + private class TcpDiscoverySpiMBeanImpl extends IgniteSpiMBeanAdapter implements TcpDiscoverySpiMBean { + /** {@inheritDoc} */ + TcpDiscoverySpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public String getSpiState() { + return impl.getSpiState(); + } + + /** {@inheritDoc} */ + @Override public int getMessageWorkerQueueSize() { + return impl.getMessageWorkerQueueSize(); + } + + /** {@inheritDoc} */ + @Nullable @Override public UUID getCoordinator() { + return impl.getCoordinator(); + } + + /** {@inheritDoc} */ + @Override public void dumpDebugInfo() { + impl.dumpDebugInfo(log); + } + + /** {@inheritDoc} */ + @Override public long getSocketTimeout() { + return TcpDiscoverySpi.this.getSocketTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getMaxAckTimeout() { + return TcpDiscoverySpi.this.getMaxAckTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getAckTimeout() { + return TcpDiscoverySpi.this.getAckTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getNetworkTimeout() { + return TcpDiscoverySpi.this.getNetworkTimeout(); + } + + /** {@inheritDoc} */ + @Override public long getJoinTimeout() { + return TcpDiscoverySpi.this.getJoinTimeout(); + } + + /** {@inheritDoc} */ + @Override public int getLocalPort() { + return TcpDiscoverySpi.this.getLocalPort(); + } + + /** {@inheritDoc} */ + @Override public int getLocalPortRange() { + return TcpDiscoverySpi.this.getLocalPortRange(); + } + + /** {@inheritDoc} */ + @Override public long getIpFinderCleanFrequency() { + return TcpDiscoverySpi.this.getIpFinderCleanFrequency(); + } + + /** {@inheritDoc} */ + @Override public int getThreadPriority() { + return TcpDiscoverySpi.this.getThreadPriority(); + } + + /** {@inheritDoc} */ + @Override public long getHeartbeatFrequency() { + return TcpDiscoverySpi.this.getHeartbeatFrequency(); + } + + /** {@inheritDoc} */ + @Override public int getMaxMissedHeartbeats() { + return TcpDiscoverySpi.this.getMaxMissedHeartbeats(); + } + + /** {@inheritDoc} */ + @Override public int getMaxMissedClientHeartbeats() { + return TcpDiscoverySpi.this.getMaxMissedClientHeartbeats(); + } + + /** {@inheritDoc} */ + @Override public long getStatisticsPrintFrequency() { + return TcpDiscoverySpi.this.getStatisticsPrintFrequency(); + } + + /** {@inheritDoc} */ + @Override public String getIpFinderFormatted() { + return TcpDiscoverySpi.this.getIpFinderFormatted(); + } + + /** {@inheritDoc} */ + @Override public int getReconnectCount() { + return TcpDiscoverySpi.this.getReconnectCount(); + } + + /** {@inheritDoc} */ + @Override public boolean isClientMode() { + return TcpDiscoverySpi.this.isClientMode(); + } + + /** {@inheritDoc} */ + @Override public long getNodesJoined() { + return TcpDiscoverySpi.this.getNodesJoined(); + } + + /** {@inheritDoc} */ + @Override public long getNodesLeft() { + return TcpDiscoverySpi.this.getNodesLeft(); + } + + /** {@inheritDoc} */ + @Override public long getNodesFailed() { + return TcpDiscoverySpi.this.getNodesFailed(); + } + + /** {@inheritDoc} */ + @Override public long getPendingMessagesRegistered() { + return TcpDiscoverySpi.this.getPendingMessagesRegistered(); + } + + /** {@inheritDoc} */ + @Override public long getPendingMessagesDiscarded() { + return stats.pendingMessagesDiscarded(); + + } + + /** {@inheritDoc} */ + @Override public long getAvgMessageProcessingTime() { + return TcpDiscoverySpi.this.getAvgMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public long getMaxMessageProcessingTime() { + return TcpDiscoverySpi.this.getMaxMessageProcessingTime(); + } + + /** {@inheritDoc} */ + @Override public int getTotalReceivedMessages() { + return TcpDiscoverySpi.this.getTotalReceivedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getReceivedMessages() { + return TcpDiscoverySpi.this.getReceivedMessages(); + } + + /** {@inheritDoc} */ + @Override public int getTotalProcessedMessages() { + return TcpDiscoverySpi.this.getTotalProcessedMessages(); + } + + /** {@inheritDoc} */ + @Override public Map<String, Integer> getProcessedMessages() { + return TcpDiscoverySpi.this.getProcessedMessages(); + } + + /** {@inheritDoc} */ + @Override public long getCoordinatorSinceTimestamp() { + return TcpDiscoverySpi.this.getCoordinatorSinceTimestamp(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java index 0e0aed5..1cd91f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java @@ -71,10 +71,13 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde * with IP finder will be seen by IP finders on all other nodes. * * @param shared {@code true} if this IP finder is shared. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setShared(boolean shared) { + public TcpDiscoveryIpFinderAdapter setShared(boolean shared) { this.shared = shared; + + return this; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java index a16f238..fbbda07 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java @@ -263,10 +263,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { * Data source should be fully configured and ready-to-use. * * @param dataSrc Data source. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = false) - public void setDataSource(DataSource dataSrc) { + public TcpDiscoveryJdbcIpFinder setDataSource(DataSource dataSrc) { this.dataSrc = dataSrc; + + return this; } /** @@ -275,10 +278,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { * * @param initSchema {@code True} if DB schema should be initialized by Ignite (default behaviour), * {code @false} if schema was explicitly created by user. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setInitSchema(boolean initSchema) { + public TcpDiscoveryJdbcIpFinder setInitSchema(boolean initSchema) { this.initSchema = initSchema; + + return this; } /** @@ -404,6 +410,13 @@ public class TcpDiscoveryJdbcIpFinder extends TcpDiscoveryIpFinderAdapter { } /** {@inheritDoc} */ + @Override public TcpDiscoveryJdbcIpFinder setShared(boolean shared) { + super.setShared(shared); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryJdbcIpFinder.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index 8fe8a65..6c47014 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -153,10 +153,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * If not provided, default value is {@link #DFLT_MCAST_GROUP}. * * @param mcastGrp Multicast IP address. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMulticastGroup(String mcastGrp) { + public TcpDiscoveryMulticastIpFinder setMulticastGroup(String mcastGrp) { this.mcastGrp = mcastGrp; + + return this; } /** @@ -174,10 +177,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * If not provided, default value is {@link #DFLT_MCAST_PORT}. * * @param mcastPort Multicast port number. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMulticastPort(int mcastPort) { + public TcpDiscoveryMulticastIpFinder setMulticastPort(int mcastPort) { this.mcastPort = mcastPort; + + return this; } /** @@ -196,10 +202,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * If not provided, default value is {@link #DFLT_RES_WAIT_TIME}. * * @param resWaitTime Time IP finder waits for reply to multicast address request. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setResponseWaitTime(int resWaitTime) { + public TcpDiscoveryMulticastIpFinder setResponseWaitTime(int resWaitTime) { this.resWaitTime = resWaitTime; + + return this; } /** @@ -219,10 +228,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * If not provided, default value is {@link #DFLT_ADDR_REQ_ATTEMPTS}. * * @param addrReqAttempts Number of attempts to send multicast address request. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setAddressRequestAttempts(int addrReqAttempts) { + public TcpDiscoveryMulticastIpFinder setAddressRequestAttempts(int addrReqAttempts) { this.addrReqAttempts = addrReqAttempts; + + return this; } /** @@ -245,10 +257,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * * @param locAddr Local host address. * @see org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi#setLocalAddress(String) + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setLocalAddress(String locAddr) { + public TcpDiscoveryMulticastIpFinder setLocalAddress(String locAddr) { this.locAddr = locAddr; + + return this; } /** @@ -272,10 +287,13 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { * Default value is {@code -1} which corresponds to system default value. * * @param ttl Time to live. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setTimeToLive(int ttl) { + public TcpDiscoveryMulticastIpFinder setTimeToLive(int ttl) { this.ttl = ttl; + + return this; } /** @@ -650,11 +668,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { } } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); - } - /** * @param e Network error to handle. * @return {@code True} if this error is recoverable and the operation can be retried. @@ -670,6 +683,18 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { return true; } + /** {@inheritDoc} */ + @Override public TcpDiscoveryMulticastIpFinder setShared(boolean shared) { + super.setShared(shared); + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryMulticastIpFinder.class, this, "super", super.toString()); + } + /** * Response to multicast address request. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java index d4e93d2..a30309c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/sharedfs/TcpDiscoverySharedFsIpFinder.java @@ -112,10 +112,13 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { * Sets path. * * @param path Shared path. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setPath(String path) { + public TcpDiscoverySharedFsIpFinder setPath(String path) { this.path = path; + + return this; } /** @@ -302,6 +305,13 @@ public class TcpDiscoverySharedFsIpFinder extends TcpDiscoveryIpFinderAdapter { } /** {@inheritDoc} */ + @Override public TcpDiscoverySharedFsIpFinder setShared(boolean shared) { + super.setShared(shared); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoverySharedFsIpFinder.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java index 94c237f..e2239b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/vm/TcpDiscoveryVmIpFinder.java @@ -129,11 +129,12 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { * * @param addrs Known nodes addresses. * @throws IgniteSpiException If any error occurs. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public synchronized void setAddresses(Collection<String> addrs) throws IgniteSpiException { + public synchronized TcpDiscoveryVmIpFinder setAddresses(Collection<String> addrs) throws IgniteSpiException { if (F.isEmpty(addrs)) - return; + return this; Collection<InetSocketAddress> newAddrs = new LinkedHashSet<>(); @@ -141,6 +142,8 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { newAddrs.addAll(address(ipStr)); this.addrs = newAddrs; + + return this; } /** @@ -261,6 +264,13 @@ public class TcpDiscoveryVmIpFinder extends TcpDiscoveryIpFinderAdapter { } /** {@inheritDoc} */ + @Override public TcpDiscoveryVmIpFinder setShared(boolean shared) { + super.setShared(shared); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryVmIpFinder.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java index dcfbde1..a61c236 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/eventstorage/memory/MemoryEventStorageSpi.java @@ -29,6 +29,7 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.eventstorage.EventStorageSpi; import org.jsr166.ConcurrentLinkedDeque8; @@ -94,8 +95,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; * @see org.apache.ignite.spi.eventstorage.EventStorageSpi */ @IgniteSpiMultipleInstancesSupport(true) -public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStorageSpi, - MemoryEventStorageSpiMBean { +public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStorageSpi { /** Default event time to live value in milliseconds (value is {@link Long#MAX_VALUE}). */ public static final long DFLT_EXPIRE_AGE_MS = Long.MAX_VALUE; @@ -131,10 +131,13 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor * Sets filter for events to be recorded. * * @param filter Filter to use. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setFilter(IgnitePredicate<Event> filter) { + public MemoryEventStorageSpi setFilter(IgnitePredicate<Event> filter) { this.filter = filter; + + return this; } /** {@inheritDoc} */ @@ -151,7 +154,7 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor log.debug(configInfo("expireCnt", expireCnt)); } - registerMBean(igniteInstanceName, this, MemoryEventStorageSpiMBean.class); + registerMBean(igniteInstanceName, new MemoryEventStorageSpiMBeanImpl(this), MemoryEventStorageSpiMBean.class); // Ack ok start. if (log.isDebugEnabled()) @@ -171,16 +174,37 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor } /** + * See {@link #setExpireAgeMs(long)} + * + * @return Event time-to-live. + */ + public long getExpireAgeMs() { + return expireAgeMs; + } + + /** * Sets events expiration time. All events that exceed this value * will be removed from the queue when next event comes. * <p> * If not provided, default value is {@link #DFLT_EXPIRE_AGE_MS}. * * @param expireAgeMs Expiration time in milliseconds. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setExpireAgeMs(long expireAgeMs) { + public MemoryEventStorageSpi setExpireAgeMs(long expireAgeMs) { this.expireAgeMs = expireAgeMs; + + return this; + } + + /** + * See {@link #setExpireCount(long)} + * + * @return Maximum event queue size. + */ + public long getExpireCount() { + return expireCnt; } /** @@ -189,29 +213,28 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor * If not provided, default value {@link #DFLT_EXPIRE_COUNT} will be used. * * @param expireCnt Maximum queue size. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setExpireCount(long expireCnt) { + public MemoryEventStorageSpi setExpireCount(long expireCnt) { this.expireCnt = expireCnt; - } - /** {@inheritDoc} */ - @Override public long getExpireAgeMs() { - return expireAgeMs; + return this; } - /** {@inheritDoc} */ - @Override public long getExpireCount() { - return expireCnt; - } - - /** {@inheritDoc} */ - @Override public long getQueueSize() { + /** + * Gets current queue size of the event queue. + * + * @return Current queue size of the event queue. + */ + public long getQueueSize() { return evts.sizex(); } - /** {@inheritDoc} */ - @Override public void clearAll() { + /** + * Removes all events from the event queue. + */ + public void clearAll() { evts.clear(); } @@ -278,7 +301,44 @@ public class MemoryEventStorageSpi extends IgniteSpiAdapter implements EventStor } /** {@inheritDoc} */ + @Override public MemoryEventStorageSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MemoryEventStorageSpi.class, this); } + + /** + * MBean implementation for MemoryEventStorageSpi. + */ + private class MemoryEventStorageSpiMBeanImpl extends IgniteSpiMBeanAdapter implements MemoryEventStorageSpiMBean { + /** {@inheritDoc} */ + MemoryEventStorageSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public long getExpireAgeMs() { + return MemoryEventStorageSpi.this.getExpireAgeMs(); + } + + /** {@inheritDoc} */ + @Override public long getExpireCount() { + return MemoryEventStorageSpi.this.getExpireCount(); + } + + /** {@inheritDoc} */ + @Override public long getQueueSize() { + return MemoryEventStorageSpi.this.getQueueSize(); + } + + /** {@inheritDoc} */ + @Override public void clearAll() { + MemoryEventStorageSpi.this.clearAll(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index 4b916e7..468a627 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -37,6 +37,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.failover.FailoverContext; import org.apache.ignite.spi.failover.FailoverSpi; @@ -95,7 +96,7 @@ import org.apache.ignite.spi.failover.FailoverSpi; */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = true) -public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, AlwaysFailoverSpiMBean { +public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi { /** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */ public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5; @@ -124,8 +125,12 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, /** Number of jobs that were failed over. */ private int totalFailoverJobs; - /** {@inheritDoc} */ - @Override public int getMaximumFailoverAttempts() { + /** + * See {@link #setMaximumFailoverAttempts(int)}. + * + * @return Maximum number of attempts to execute a failed job on another node. + */ + public int getMaximumFailoverAttempts() { return maxFailoverAttempts; } @@ -134,14 +139,21 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, * If not specified, {@link #DFLT_MAX_FAILOVER_ATTEMPTS} value will be used. * * @param maxFailoverAttempts Maximum number of attempts to execute a failed job on another node. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMaximumFailoverAttempts(int maxFailoverAttempts) { + public AlwaysFailoverSpi setMaximumFailoverAttempts(int maxFailoverAttempts) { this.maxFailoverAttempts = maxFailoverAttempts; + + return this; } - /** {@inheritDoc} */ - @Override public int getTotalFailoverJobsCount() { + /** + * Get total number of jobs that were failed over. + * + * @return Total number of failed over jobs. + */ + public int getTotalFailoverJobsCount() { return totalFailoverJobs; } @@ -160,7 +172,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, if (log.isDebugEnabled()) log.debug(configInfo("maximumFailoverAttempts", maxFailoverAttempts)); - registerMBean(igniteInstanceName, this, AlwaysFailoverSpiMBean.class); + registerMBean(igniteInstanceName, new AlwaysFailoverSpiMBeanImpl(this), AlwaysFailoverSpiMBean.class); // Ack ok start. if (log.isDebugEnabled()) @@ -286,7 +298,34 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, } /** {@inheritDoc} */ + @Override public AlwaysFailoverSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(AlwaysFailoverSpi.class, this); } + + /** + * MBean implementation for AlwaysFailoverSpi. + */ + private class AlwaysFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements AlwaysFailoverSpiMBean { + /** {@inheritDoc} */ + AlwaysFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public int getMaximumFailoverAttempts() { + return AlwaysFailoverSpi.this.getMaximumFailoverAttempts(); + } + + /** {@inheritDoc} */ + @Override public int getTotalFailoverJobsCount() { + return AlwaysFailoverSpi.this.getTotalFailoverJobsCount(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java index 05c681d..3ef32ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/jobstealing/JobStealingFailoverSpi.java @@ -34,6 +34,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiConfiguration; import org.apache.ignite.spi.IgniteSpiConsistencyChecked; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.failover.FailoverContext; import org.apache.ignite.spi.failover.FailoverSpi; @@ -98,8 +99,7 @@ import static org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSp */ @IgniteSpiMultipleInstancesSupport(true) @IgniteSpiConsistencyChecked(optional = true) -public class JobStealingFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, - JobStealingFailoverSpiMBean { +public class JobStealingFailoverSpi extends IgniteSpiAdapter implements FailoverSpi { /** Maximum number of attempts to execute a failed job on another node (default is {@code 5}). */ public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 5; @@ -136,8 +136,12 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover /** Number of jobs that were stolen. */ private int totalStolenJobs; - /** {@inheritDoc} */ - @Override public int getMaximumFailoverAttempts() { + /** + * See {@link #setMaximumFailoverAttempts(int)}. + * + * @return Maximum number of attempts to execute a failed job on another node. + */ + public int getMaximumFailoverAttempts() { return maxFailoverAttempts; } @@ -151,19 +155,30 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover * * @param maxFailoverAttempts Maximum number of attempts to execute a failed * job on another node. + * @return {@code this} for chaining. */ @IgniteSpiConfiguration(optional = true) - public void setMaximumFailoverAttempts(int maxFailoverAttempts) { + public JobStealingFailoverSpi setMaximumFailoverAttempts(int maxFailoverAttempts) { this.maxFailoverAttempts = maxFailoverAttempts; + + return this; } - /** {@inheritDoc} */ - @Override public int getTotalFailedOverJobsCount() { + /** + * Get total number of jobs that were failed over including stolen ones. + * + * @return Total number of failed over jobs. + */ + public int getTotalFailedOverJobsCount() { return totalFailedOverJobs; } - /** {@inheritDoc} */ - @Override public int getTotalStolenJobsCount() { + /** + * Get total number of jobs that were stolen. + * + * @return Total number of stolen jobs. + */ + public int getTotalStolenJobsCount() { return totalStolenJobs; } @@ -182,7 +197,7 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover if (log.isDebugEnabled()) log.debug(configInfo("maxFailoverAttempts", maxFailoverAttempts)); - registerMBean(igniteInstanceName, this, JobStealingFailoverSpiMBean.class); + registerMBean(igniteInstanceName, new JobStealingFailoverSpiMBeanImpl(this), JobStealingFailoverSpiMBean.class); // Ack ok start. if (log.isDebugEnabled()) @@ -355,7 +370,40 @@ public class JobStealingFailoverSpi extends IgniteSpiAdapter implements Failover } /** {@inheritDoc} */ + @Override public JobStealingFailoverSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(JobStealingFailoverSpi.class, this); } + + /** + * MBean implementation for JobStealingFailoverSpi. + */ + private class JobStealingFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements JobStealingFailoverSpiMBean { + /** {@inheritDoc} */ + public JobStealingFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + + /** {@inheritDoc} */ + @Override public int getMaximumFailoverAttempts() { + return JobStealingFailoverSpi.this.getMaximumFailoverAttempts(); + } + + /** {@inheritDoc} */ + @Override public int getTotalFailedOverJobsCount() { + return JobStealingFailoverSpi.this.getTotalFailedOverJobsCount(); + } + + /** {@inheritDoc} */ + @Override public int getTotalStolenJobsCount() { + return JobStealingFailoverSpi.this.getTotalStolenJobsCount(); + } + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java index 1056d2e..ffd695e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/never/NeverFailoverSpi.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiMBeanAdapter; import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.failover.FailoverContext; import org.apache.ignite.spi.failover.FailoverSpi; @@ -32,7 +33,8 @@ import org.apache.ignite.spi.failover.FailoverSpi; /** * This class provides failover SPI implementation that never fails over. This implementation * never fails over a failed job by always returning {@code null} out of - * {@link org.apache.ignite.spi.failover.FailoverSpi#failover(org.apache.ignite.spi.failover.FailoverContext, List)} method. + * {@link org.apache.ignite.spi.failover.FailoverSpi#failover(org.apache.ignite.spi.failover.FailoverContext, List)} + * method. * <h1 class="header">Configuration</h1> * <h2 class="header">Mandatory</h2> * This SPI has no mandatory configuration parameters. @@ -54,17 +56,18 @@ import org.apache.ignite.spi.failover.FailoverSpi; * Here is an example on how to configure grid with {@link NeverFailoverSpi} from Spring XML configuration file: * <pre name="code" class="xml"> * <property name="failoverSpi"> - * <bean class="org.apache.ignite.spi.failover.never.NeverFailoverSpi"/> + * <bean class="org.apache.ignite.spi.failover.never.NeverFailoverSpi"/> * </property> * </pre> * <p> * <img src="http://ignite.apache.org/images/spring-small.png"> * <br> * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + * * @see org.apache.ignite.spi.failover.FailoverSpi */ @IgniteSpiMultipleInstancesSupport(true) -public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, NeverFailoverSpiMBean { +public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi { /** Injected grid logger. */ @LoggerResource private IgniteLogger log; @@ -74,7 +77,7 @@ public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, N // Start SPI start stopwatch. startStopwatch(); - registerMBean(igniteInstanceName, this, NeverFailoverSpiMBean.class); + registerMBean(igniteInstanceName, new NeverFailoverSpiMBeanImpl(this), NeverFailoverSpiMBean.class); // Ack ok start. if (log.isDebugEnabled()) @@ -93,14 +96,31 @@ public class NeverFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, N /** {@inheritDoc} */ @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { U.warn(log, "Returning 'null' node for failed job (failover will not happen) [job=" + - ctx.getJobResult().getJob() + ", task=" + ctx.getTaskSession().getTaskName() + + ctx.getJobResult().getJob() + ", task=" + ctx.getTaskSession().getTaskName() + ", sessionId=" + ctx.getTaskSession().getId() + ']'); return null; } /** {@inheritDoc} */ + @Override public NeverFailoverSpi setName(String name) { + super.setName(name); + + return this; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(NeverFailoverSpi.class, this); } + + /** + * MBean implementation for NeverFailoverSpi. + */ + private class NeverFailoverSpiMBeanImpl extends IgniteSpiMBeanAdapter implements NeverFailoverSpiMBean { + /** {@inheritDoc} */ + NeverFailoverSpiMBeanImpl(IgniteSpiAdapter spiAdapter) { + super(spiAdapter); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc88028/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java index 5c8bfd2..a8683a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/noop/NoopIndexingSpi.java @@ -68,4 +68,11 @@ public class NoopIndexingSpi extends IgniteSpiAdapter implements IndexingSpi { @Override public void spiStop() throws IgniteSpiException { // No-op. } + + /** {@inheritDoc} */ + @Override public NoopIndexingSpi setName(String name) { + super.setName(name); + + return this; + } } \ No newline at end of file