http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 47498dd..a8a5bd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; @@ -1650,17 +1651,69 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rcvdBytesCnt.add(-rcvdBytesCnt.sum()); } + /** + * @param nodeId Target node ID. + * @return Future. + */ + public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) { + StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl()); + + dumpInfo(sb, nodeId); + + GridNioServer<Message> nioSrvr = this.nioSrvr; + + if (nioSrvr != null) { + sb.append("NIO sessions statistics:"); + + IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() { + @Override public boolean apply(GridNioSession ses) { + ConnectionKey connId = ses.meta(CONN_IDX_META); + + return connId != null && nodeId.equals(connId.nodeId()); + } + }; + + return nioSrvr.dumpNodeStats(sb.toString(), p); + } + else { + sb.append(U.nl()).append("GridNioServer is null."); + + return new GridFinishedFuture<>(sb.toString()); + } + } + /** {@inheritDoc} */ @Override public void dumpStats() { IgniteLogger log = this.log; if (log != null) { - StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); + StringBuilder sb = new StringBuilder(); + + dumpInfo(sb, null); - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); + U.warn(log, sb.toString()); + } - sb.append(" [key=").append(entry.getKey()) + GridNioServer<Message> nioSrvr = this.nioSrvr; + + if (nioSrvr != null) + nioSrvr.dumpStats(); + } + + /** + * @param sb Message builder. + * @param dstNodeId Target node ID. + */ + private void dumpInfo(StringBuilder sb, UUID dstNodeId) { + sb.append("Communication SPI recovery descriptors: ").append(U.nl()); + + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); + + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; + + sb.append(" [key=").append(entry.getKey()) .append(", msgsSent=").append(desc.sent()) .append(", msgsAckedByRmt=").append(desc.acked()) .append(", msgsRcvd=").append(desc.received()) @@ -1668,12 +1721,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", reserveCnt=").append(desc.reserveCount()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); - } + } - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); - sb.append(" [key=").append(entry.getKey()) + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; + + sb.append(" [key=").append(entry.getKey()) .append(", msgsSent=").append(desc.sent()) .append(", msgsAckedByRmt=").append(desc.acked()) .append(", reserveCnt=").append(desc.reserveCount()) @@ -1681,12 +1737,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", reserved=").append(desc.reserved()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); - } + } + + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); - for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); + if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId())) + continue; - sb.append(" [key=").append(entry.getKey()) + sb.append(" [key=").append(entry.getKey()) .append(", msgsRcvd=").append(desc.received()) .append(", lastAcked=").append(desc.lastAcknowledged()) .append(", reserveCnt=").append(desc.reserveCount()) @@ -1695,38 +1754,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(", handshakeIdx=").append(desc.handshakeIndex()) .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); - } + } - sb.append("Communication SPI clients: ").append(U.nl()); + sb.append("Communication SPI clients: ").append(U.nl()); - for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) { - UUID nodeId = entry.getKey(); - GridCommunicationClient[] clients0 = entry.getValue(); + for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) { + UUID clientNodeId = entry.getKey(); - for (GridCommunicationClient client : clients0) { - if (client != null) { - sb.append(" [node=").append(nodeId) + if (dstNodeId != null && !dstNodeId.equals(clientNodeId)) + continue; + + GridCommunicationClient[] clients0 = entry.getValue(); + + for (GridCommunicationClient client : clients0) { + if (client != null) { + sb.append(" [node=").append(clientNodeId) .append(", client=").append(client) .append(']').append(U.nl()); - } } } - - U.warn(log, sb.toString()); } - - GridNioServer<Message> nioSrvr = this.nioSrvr; - - if (nioSrvr != null) - nioSrvr.dumpStats(); } - /** */ - private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>(); - - /** */ - private final AtomicInteger connIdx = new AtomicInteger(); - /** {@inheritDoc} */ @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -1762,43 +1811,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } if (connectionsPerNode > 1) { - int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0); - - switch (idxMode) { - case 0: { - connPlc = new ConnectionPolicy() { - @Override public int connectionIndex() { - return (int)(Thread.currentThread().getId() % connectionsPerNode); - } - }; - - break; - } - - case 1: { - connPlc = new ConnectionPolicy() { - @Override public int connectionIndex() { - Integer threadIdx = threadConnIdx.get(); - - if (threadIdx != null) - return threadIdx; - - for (;;) { - int idx = connIdx.get(); - int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1; - - if (connIdx.compareAndSet(idx, nextIdx)) { - threadConnIdx.set(idx); - - return idx; - } - } - } - }; - - break; + connPlc = new ConnectionPolicy() { + @Override public int connectionIndex() { + return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode); } - } + }; } else { connPlc = new ConnectionPolicy() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java new file mode 100644 index 0000000..6b77704 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * + */ +public class IgniteDiagnosticMessagesMultipleConnectionsTest extends IgniteDiagnosticMessagesTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(5); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java new file mode 100644 index 0000000..698cc1f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setName("c1"); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED, "true"); + + startGrids(3); + + client = true; + + startGrid(3); + + startGrid(4); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED, "false"); + + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testDiagnosticMessages() throws Exception { + awaitPartitionMapExchange(); + + sendDiagnostic(); + + for (int i = 0; i < 5; i++) { + final IgniteCache cache = ignite(i).cache("c1"); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int j = 0; j < 10; j++) + cache.put(ThreadLocalRandom.current().nextInt(), j); + } + }, 10, "cache-thread"); + } + + sendDiagnostic(); + } + + /** + * @throws Exception If failed. + */ + private void sendDiagnostic() throws Exception { + for (int i = 0; i < 5; i++) { + IgniteKernal node = (IgniteKernal)ignite(i); + + for (int j = 0; j < 5; j++) { + if (i != j) { + ClusterNode dstNode = ignite(j).cluster().localNode(); + + final GridFutureAdapter<String> fut = new GridFutureAdapter<>(); + + node.context().cluster().dumpBasicInfo(dstNode.id(), "Test diagnostic", + new IgniteInClosure<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> diagFut) { + try { + fut.onDone(diagFut.get()); + } + catch (Exception e) { + fut.onDone(e); + } + } + } + ); + + String msg = fut.get(); + + assertTrue("Unexpected message: " + msg, + msg.contains("Test diagnostic") && + msg.contains("General node info [id=" + dstNode.id() + ", client=" + dstNode.isClient() + ", discoTopVer=AffinityTopologyVersion [topVer=5, minorTopVer=0]") && + msg.contains("Partitions exchange info [readyVer=AffinityTopologyVersion [topVer=5, minorTopVer=")); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 451c437..244fbb7 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -39,6 +39,8 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreOptimizedMarshallerW import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; +import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesMultipleConnectionsTest; +import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest; import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest; import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest; import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest; @@ -336,6 +338,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCommunicationBalancePairedConnectionsTest.class); suite.addTestSuite(IgniteCommunicationBalanceMultipleConnectionsTest.class); suite.addTestSuite(IgniteIoTestMessagesTest.class); + suite.addTestSuite(IgniteDiagnosticMessagesTest.class); + suite.addTestSuite(IgniteDiagnosticMessagesMultipleConnectionsTest.class); suite.addTestSuite(IgniteIncompleteCacheObjectSelfTest.class);
