http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 66993ea..3591703 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -54,7 +54,9 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -704,14 +707,57 @@ public class GridNioServer<T> { } /** - * + * @return Future. */ - public void dumpStats() { - U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + - ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); + public IgniteInternalFuture<String> dumpStats() { + String msg = "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + + ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'; - for (int i = 0; i < clientWorkers.size(); i++) - clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS)); + return dumpStats(msg, null); + } + + /** + * @param msg Message to add. + * @param p Session predicate. + * @return Future. + */ + public IgniteInternalFuture<String> dumpStats(final String msg, IgnitePredicate<GridNioSession> p) { + GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() { + private final StringBuilder sb = new StringBuilder(msg); + + @Override public boolean collect(@Nullable String msg) { + if (!F.isEmpty(msg)) { + synchronized (sb) { + if (sb.length() > 0) + sb.append(U.nl()); + + sb.append(msg); + } + } + + return true; + } + + @Override public String reduce() { + synchronized (sb) { + return sb.toString(); + } + } + }); + + for (int i = 0; i < clientWorkers.size(); i++) { + NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS); + + opFut.msg = p; + + clientWorkers.get(i).offer(opFut); + + fut.add(opFut); + } + + fut.markInitialized(); + + return fut; } /** @@ -1812,12 +1858,16 @@ public class GridNioServer<T> { case DUMP_STATS: { NioOperationFuture req = (NioOperationFuture)req0; + IgnitePredicate<GridNioSession> p = + req.msg instanceof IgnitePredicate ? (IgnitePredicate<GridNioSession>)req.msg : null; + + StringBuilder sb = new StringBuilder(); + try { - dumpStats(); + dumpStats(sb, p, p!= null); } finally { - // Complete the request just in case (none should wait on this future). - req.onDone(true); + req.onDone(sb.toString()); } } } @@ -1925,80 +1975,126 @@ public class GridNioServer<T> { } /** - * + * @param sb Message builder. + * @param keys Keys. */ - private void dumpStats() { - StringBuilder sb = new StringBuilder(); - - Set<SelectionKey> keys = selector.keys(); - - sb.append(U.nl()) - .append(">> Selector info [idx=").append(idx) + private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) { + sb.append(">> Selector info [idx=").append(idx) .append(", keysCnt=").append(keys.size()) .append(", bytesRcvd=").append(bytesRcvd) .append(", bytesRcvd0=").append(bytesRcvd0) .append(", bytesSent=").append(bytesSent) .append(", bytesSent0=").append(bytesSent0) .append("]").append(U.nl()); + } + + /** + * @param sb Message builder. + * @param p Optional session predicate. + * @param shortInfo Short info flag. + */ + private void dumpStats(StringBuilder sb, + @Nullable IgnitePredicate<GridNioSession> p, + boolean shortInfo) { + Set<SelectionKey> keys = selector.keys(); + + boolean selInfo = p == null; + + if (selInfo) + dumpSelectorInfo(sb, keys); for (SelectionKey key : keys) { GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + boolean sesInfo = p == null || p.apply(ses); - sb.append(" Connection info [") - .append("in=").append(ses.accepted()) - .append(", rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()); + if (sesInfo) { + if (!selInfo) { + dumpSelectorInfo(sb, keys); - GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); + selInfo = true; + } - if (outDesc != null) { - sb.append(", msgsSent=").append(outDesc.sent()) - .append(", msgsAckedByRmt=").append(outDesc.acked()) - .append(", descIdHash=").append(System.identityHashCode(outDesc)); - } - else - sb.append(", outRecoveryDesc=null"); + sb.append(" Connection info [") + .append("in=").append(ses.accepted()) + .append(", rmtAddr=").append(ses.remoteAddress()) + .append(", locAddr=").append(ses.localAddress()); - GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); - if (inDesc != null) { - sb.append(", msgsRcvd=").append(inDesc.received()) - .append(", lastAcked=").append(inDesc.lastAcknowledged()) - .append(", descIdHash=").append(System.identityHashCode(inDesc)); - } - else - sb.append(", inRecoveryDesc=null"); + if (outDesc != null) { + sb.append(", msgsSent=").append(outDesc.sent()) + .append(", msgsAckedByRmt=").append(outDesc.acked()) + .append(", descIdHash=").append(System.identityHashCode(outDesc)); - sb.append(", bytesRcvd=").append(ses.bytesReceived()) - .append(", bytesRcvd0=").append(ses.bytesReceived0()) - .append(", bytesSent=").append(ses.bytesSent()) - .append(", bytesSent0=").append(ses.bytesSent0()) - .append(", opQueueSize=").append(ses.writeQueueSize()) - .append(", msgWriter=").append(writer != null ? writer.toString() : "null") - .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + if (!outDesc.messagesRequests().isEmpty()) { + int cnt = 0; - int cnt = 0; + sb.append(", unackedMsgs=["); - for (SessionWriteRequest req : ses.writeQueue()) { - if (cnt == 0) - sb.append(",\n opQueue=[").append(req); + for (SessionWriteRequest req : outDesc.messagesRequests()) { + if (cnt != 0) + sb.append(", "); + + Object msg = req.message(); + + if (shortInfo && msg instanceof GridIoMessage) + msg = ((GridIoMessage)msg).message().getClass().getSimpleName(); + + sb.append(msg); + + if (++cnt == 5) + break; + } + + sb.append(']'); + } + } else - sb.append(',').append(req); + sb.append(", outRecoveryDesc=null"); - if (++cnt == 5) { - sb.append(']'); + GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); - break; + if (inDesc != null) { + sb.append(", msgsRcvd=").append(inDesc.received()) + .append(", lastAcked=").append(inDesc.lastAcknowledged()) + .append(", descIdHash=").append(System.identityHashCode(inDesc)); } - } + else + sb.append(", inRecoveryDesc=null"); - sb.append("]").append(U.nl()); - } + sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesRcvd0=").append(ses.bytesReceived0()) + .append(", bytesSent=").append(ses.bytesSent()) + .append(", bytesSent0=").append(ses.bytesSent0()) + .append(", opQueueSize=").append(ses.writeQueueSize()); + + if (!shortInfo) { + MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); + MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + + sb.append(", msgWriter=").append(writer != null ? writer.toString() : "null") + .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + } + + int cnt = 0; - U.warn(log, sb.toString()); + for (SessionWriteRequest req : ses.writeQueue()) { + if (cnt == 0) + sb.append(",\n opQueue=[").append(req); + else + sb.append(',').append(req); + + if (++cnt == 5) { + sb.append(']'); + + break; + } + } + + sb.append("]"); + } + } } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2e35c6e..5d74a80 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -46,7 +46,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; @@ -67,6 +66,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; @@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient; import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.nio.ssl.GridSslMeta; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -934,6 +935,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati @LoggerResource private IgniteLogger log; + /** Logger. */ + @LoggerResource(categoryName = "org.apache.ignite.internal.diagnostic") + private IgniteLogger diagnosticLog; + /** Local IP address. */ private String locAddr; @@ -1764,83 +1769,141 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * @param nodeId Target node ID. + * @return Future. + */ + public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) { + StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl()); + + dumpInfo(sb, nodeId); + + GridNioServer<Message> nioSrvr = this.nioSrvr; + + if (nioSrvr != null) { + sb.append("NIO sessions statistics:"); + + IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() { + @Override public boolean apply(GridNioSession ses) { + ConnectionKey connId = ses.meta(CONN_IDX_META); + + return connId != null && nodeId.equals(connId.nodeId()); + } + }; + + return nioSrvr.dumpStats(sb.toString(), p); + } + else { + sb.append(U.nl()).append("GridNioServer is null."); + + return new GridFinishedFuture<>(sb.toString()); + } + } + + /** * Dumps SPI per-connection stats to logs. */ public void dumpStats() { - IgniteLogger log = this.log; + final IgniteLogger log = this.diagnosticLog; if (log != null) { - StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); - - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); - - sb.append(" [key=").append(entry.getKey()) - .append(", msgsSent=").append(desc.sent()) - .append(", msgsAckedByRmt=").append(desc.acked()) - .append(", msgsRcvd=").append(desc.received()) - .append(", lastAcked=").append(desc.lastAcknowledged()) - .append(", reserveCnt=").append(desc.reserveCount()) - .append(", descIdHash=").append(System.identityHashCode(desc)) - .append(']').append(U.nl()); - } - - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); - - sb.append(" [key=").append(entry.getKey()) - .append(", msgsSent=").append(desc.sent()) - .append(", msgsAckedByRmt=").append(desc.acked()) - .append(", reserveCnt=").append(desc.reserveCount()) - .append(", connected=").append(desc.connected()) - .append(", reserved=").append(desc.reserved()) - .append(", descIdHash=").append(System.identityHashCode(desc)) - .append(']').append(U.nl()); - } + StringBuilder sb = new StringBuilder(); - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); - - sb.append(" [key=").append(entry.getKey()) - .append(", msgsRcvd=").append(desc.received()) - .append(", lastAcked=").append(desc.lastAcknowledged()) - .append(", reserveCnt=").append(desc.reserveCount()) - .append(", connected=").append(desc.connected()) - .append(", reserved=").append(desc.reserved()) - .append(", handshakeIdx=").append(desc.handshakeIndex()) - .append(", descIdHash=").append(System.identityHashCode(desc)) - .append(']').append(U.nl()); - } + dumpInfo(sb, null); - sb.append("Communication SPI clients: ").append(U.nl()); + U.warn(log, sb.toString()); - for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) { - UUID nodeId = entry.getKey(); - GridCommunicationClient[] clients0 = entry.getValue(); + GridNioServer<Message> nioSrvr = this.nioSrvr; - for (GridCommunicationClient client : clients0) { - if (client != null) { - sb.append(" [node=").append(nodeId) - .append(", client=").append(client) - .append(']').append(U.nl()); + if (nioSrvr != null) { + nioSrvr.dumpStats().listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> fut) { + try { + U.warn(log, fut.get()); + } + catch (Exception e) { + U.error(log, "Failed to dump NIO server statistics: " + e, e); + } } - } + }); } + } + } - U.warn(log, sb.toString()); + /** + * @param sb Message builder. + * @param dstNodeId Target node ID. + */ + private void dumpInfo(StringBuilder sb, UUID dstNodeId) { + sb.append("Communication SPI recovery descriptors: ").append(U.nl()); + + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); + + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; + + sb.append(" [key=").append(entry.getKey()) + .append(", msgsSent=").append(desc.sent()) + .append(", msgsAckedByRmt=").append(desc.acked()) + .append(", msgsRcvd=").append(desc.received()) + .append(", lastAcked=").append(desc.lastAcknowledged()) + .append(", reserveCnt=").append(desc.reserveCount()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); } - GridNioServer<Message> nioSrvr = this.nioSrvr; + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); - if (nioSrvr != null) - nioSrvr.dumpStats(); - } + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; - /** */ - private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>(); + sb.append(" [key=").append(entry.getKey()) + .append(", msgsSent=").append(desc.sent()) + .append(", msgsAckedByRmt=").append(desc.acked()) + .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); + } - /** */ - private final AtomicInteger connIdx = new AtomicInteger(); + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); + + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; + + sb.append(" [key=").append(entry.getKey()) + .append(", msgsRcvd=").append(desc.received()) + .append(", lastAcked=").append(desc.lastAcknowledged()) + .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) + .append(", handshakeIdx=").append(desc.handshakeIndex()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); + } + + sb.append("Communication SPI clients: ").append(U.nl()); + + for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) { + UUID clientNodeId = entry.getKey(); + + if (dstNodeId != null && !dstNodeId.equals(clientNodeId)) + continue; + + GridCommunicationClient[] clients0 = entry.getValue(); + + for (GridCommunicationClient client : clients0) { + if (client != null) { + sb.append(" [node=").append(clientNodeId) + .append(", client=").append(client) + .append(']').append(U.nl()); + } + } + } + } /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java new file mode 100644 index 0000000..08dbc66 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridStringLogger; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private Integer connectionsPerNode; + + /** */ + private boolean testSpi; + + /** */ + private GridStringLogger strLog; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (testSpi) + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + if (connectionsPerNode != null) + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connectionsPerNode); + + cfg.setClientMode(client); + + if (strLog != null) { + cfg.setGridLogger(strLog); + + strLog = null; + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + + /** + * @throws Exception If failed. + */ + public void testDiagnosticMessages1() throws Exception { + checkBasicDiagnosticInfo(); + } + + /** + * @throws Exception If failed. + */ + public void testDiagnosticMessages2() throws Exception { + connectionsPerNode = 5; + + checkBasicDiagnosticInfo(); + } + + /** + * @throws Exception If failed. + */ + public void testLongRunning() throws Exception { + System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, "3500"); + + try { + testSpi = true; + + startGrid(0); + + GridStringLogger strLog = this.strLog = new GridStringLogger(); + + startGrid(1); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + + final Ignite node0 = ignite(0); + + node0.createCache(ccfg); + + final Ignite node1 = ignite(1); + + UUID id0 = node0.cluster().localNode().id(); + UUID id1 = node1.cluster().localNode().id(); + + TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearSingleGetResponse.class, node1.name()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + Integer key = primaryKey(node0.cache(DEFAULT_CACHE_NAME)); + + node1.cache(DEFAULT_CACHE_NAME).get(key); + + return null; + } + }, "get"); + + U.sleep(10_000); + + assertFalse(fut.isDone()); + + TestRecordingCommunicationSpi.spi(node0).stopBlock(); + + fut.get(); + + String log = strLog.toString(); + + assertTrue(log.contains("GridPartitionedSingleGetFuture waiting for response [node=" + id0)); + assertTrue(log.contains("General node info [id=" + id0)); + } + finally { + System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT); + } + } + + /** + * @throws Exception If failed. + */ + private void checkBasicDiagnosticInfo() throws Exception { + startGrids(3); + + client = true; + + startGrid(3); + + startGrid(4); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(REPLICATED); + ccfg.setAtomicityMode(TRANSACTIONAL); + + ignite(0).createCache(ccfg); + + awaitPartitionMapExchange(); + + sendDiagnostic(); + + for (int i = 0; i < 5; i++) { + final IgniteCache<Object, Object> cache = ignite(i).cache(DEFAULT_CACHE_NAME); + + // Put from multiple threads to create multiple connections. + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int j = 0; j < 10; j++) + cache.put(ThreadLocalRandom.current().nextInt(), j); + } + }, 10, "cache-thread"); + } + + sendDiagnostic(); + } + + /** + * @throws Exception If failed. + */ + private void sendDiagnostic() throws Exception { + for (int i = 0; i < 5; i++) { + IgniteKernal node = (IgniteKernal)ignite(i); + + for (int j = 0; j < 5; j++) { + if (i != j) { + ClusterNode dstNode = ignite(j).cluster().localNode(); + + final GridFutureAdapter<String> fut = new GridFutureAdapter<>(); + + IgniteDiagnosticPrepareContext ctx = new IgniteDiagnosticPrepareContext(node.getLocalNodeId()); + + ctx.basicInfo(dstNode.id(), "Test diagnostic"); + + ctx.send(node.context(), new IgniteInClosure<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> diagFut) { + try { + fut.onDone(diagFut.get()); + } + catch (Exception e) { + fut.onDone(e); + } + } + }); + + String msg = fut.get(); + + assertTrue("Unexpected message: " + msg, + msg.contains("Test diagnostic") && + msg.contains("General node info [id=" + dstNode.id() + ", client=" + dstNode.isClient() + ", discoTopVer=AffinityTopologyVersion [topVer=5, minorTopVer=") && + msg.contains("Partitions exchange info [readyVer=AffinityTopologyVersion [topVer=5, minorTopVer=")); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 3305058..140b1a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.GridStopWithCancelSelfTest; import org.apache.ignite.internal.IgniteLocalNodeMapBeforeStartTest; import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest; import org.apache.ignite.internal.MarshallerContextLockingSelfTest; +import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest; import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest; import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest; @@ -52,8 +53,8 @@ import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest; import org.apache.ignite.internal.processors.database.FreeListImplSelfTest; import org.apache.ignite.internal.processors.database.MemoryMetricsSelfTest; import org.apache.ignite.internal.processors.database.MetadataStorageSelfTest; -import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest; import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest; +import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest; import org.apache.ignite.internal.processors.service.ClosureServiceClientsNodesTest; import org.apache.ignite.internal.product.GridProductVersionSelfTest; import org.apache.ignite.internal.util.nio.IgniteExceptionInNioWorkerSelfTest; @@ -173,6 +174,9 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(IgniteMarshallerCacheClassNameConflictTest.class); suite.addTestSuite(IgniteMarshallerCacheClientRequestsMappingOnMissTest.class); + + suite.addTestSuite(IgniteDiagnosticMessagesTest.class); + return suite; } }
