TcoDiscovery: reduced amount of debug logging (heartbeat/connection check messages are logged trace level).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b78ad0c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b78ad0c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b78ad0c Branch: refs/heads/ignite-4242 Commit: 6b78ad0cbbcf286cb083136c49cebd5dd85de58c Parents: 6f16072 Author: sboikov <[email protected]> Authored: Mon Oct 31 10:35:44 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 31 10:35:44 2016 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 10 +- .../processors/cache/GridCacheUtils.java | 67 -------- .../ignite/spi/discovery/tcp/ClientImpl.java | 12 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 164 +++++++++++++------ .../spi/discovery/tcp/TcpDiscoveryImpl.java | 59 ++++++- .../messages/TcpDiscoveryAbstractMessage.java | 7 + .../messages/TcpDiscoveryClientAckResponse.java | 5 + .../TcpDiscoveryClientHeartbeatMessage.java | 7 +- .../TcpDiscoveryConnectionCheckMessage.java | 5 + .../messages/TcpDiscoveryHeartbeatMessage.java | 5 + 10 files changed, 211 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4eb61e3..a901e2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -718,7 +718,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * Partition refresh callback. */ - void refreshPartitions() { + private void refreshPartitions() { ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); if (oldest == null) { @@ -735,7 +735,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { - rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); + GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut; + + // No need to send to nodes which did not finish their first exchange. + AffinityTopologyVersion rmtTopVer = + lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE; + + rmts = CU.remoteNodes(cctx, rmtTopVer); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 4c18f21..90e428c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -410,23 +410,6 @@ public class GridCacheUtils { } /** - * Gets public cache name substituting null name by {@code 'default'}. - * - * @return Public cache name substituting null name by {@code 'default'}. - */ - public static String namexx(@Nullable String name) { - return name == null ? "default" : name; - } - - /** - * @return Partition to state transformer. - */ - @SuppressWarnings({"unchecked"}) - public static IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() { - return PART2STATE; - } - - /** * Gets all nodes on which cache with the same name is started. * * @param ctx Cache context. @@ -462,18 +445,6 @@ public class GridCacheUtils { } /** - * Gets alive remote nodes with at least one cache configured. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx, - AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder); - } - - /** * Gets all nodes on which cache with the same name is started and the local DHT storage is enabled. * * @param ctx Cache context. @@ -644,44 +615,6 @@ public class GridCacheUtils { } /** - * Gets type filter for projections. - * - * @param keyType Key type. - * @param valType Value type. - * @param <K> Key type. - * @param <V> Value type. - * @return Type filter. - */ - public static <K, V> IgniteBiPredicate<K, V> typeFilter(final Class<?> keyType, final Class<?> valType) { - return new P2<K, V>() { - @Override public boolean apply(K k, V v) { - return keyType.isAssignableFrom(k.getClass()) && valType.isAssignableFrom(v.getClass()); - } - - @Override public String toString() { - return "Type filter [keyType=" + keyType + ", valType=" + valType + ']'; - } - }; - } - - /** - * @param keyType Key type. - * @param valType Value type. - * @return Type filter. - */ - public static CacheEntryPredicate typeFilter0(final Class<?> keyType, final Class<?> valType) { - return new CacheEntrySerializablePredicate(new CacheEntryPredicateAdapter() { - @Override public boolean apply(GridCacheEntryEx e) { - Object val = CU.value(peekVisibleValue(e), e.context(), false); - - return val == null || - valType.isAssignableFrom(val.getClass()) && - keyType.isAssignableFrom(e.key().value(e.context().cacheObjectContext(), false).getClass()); - } - }); - } - - /** * @return Boolean reducer. */ public static IgniteReducer<Boolean, Boolean> boolReducer() { http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2d948da..f929121 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -929,8 +929,10 @@ class ClientImpl extends TcpDiscoveryImpl { msg.senderNodeId(rmtNodeId); - if (log.isDebugEnabled()) - log.debug("Message has been received: " + msg); + DebugLogger debugLog = messageLogger(msg); + + if (debugLog.isDebugEnabled()) + debugLog.debug("Message has been received: " + msg); spi.stats.onMessageReceived(msg); @@ -2079,6 +2081,8 @@ class ClientImpl extends TcpDiscoveryImpl { @Nullable DiscoverySpiCustomMessage data) { DiscoverySpiListener lsnr = spi.lsnr; + DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog; + if (lsnr != null) { if (log.isDebugEnabled()) log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + @@ -2094,14 +2098,14 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param msg Message. */ - public void addMessage(Object msg) { + void addMessage(Object msg) { queue.add(msg); } /** * @return Queue size. */ - public int queueSize() { + int queueSize() { return queue.size(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 78a5f39..55e5c89 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -214,6 +214,12 @@ class ServerImpl extends TcpDiscoveryImpl { /** Leaving nodes (but still in topology). */ private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); + /** Collection to track joining nodes. */ + private Set<UUID> joiningNodes = new HashSet<>(); + + /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */ + private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>(); + /** If non-shared IP finder is used this flag shows whether IP finder contains local address. */ private boolean ipFinderHasLocAddr; @@ -296,7 +302,7 @@ class ServerImpl extends TcpDiscoveryImpl { throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " + "in debug mode."); - debugLog = new ConcurrentLinkedDeque<>(); + debugLogQ = new ConcurrentLinkedDeque<>(); U.quietAndWarn(log, "TCP discovery SPI is configured in debug mode."); } @@ -1288,6 +1294,8 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState spiState = spiStateCopy(); + DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog; + if (lsnr != null && node.visible() && (spiState == CONNECTED || spiState == DISCONNECTING)) { if (log.isDebugEnabled()) log.debug("Discovery notification [node=" + node + ", spiState=" + spiState + @@ -1410,10 +1418,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isInfoEnabled() && spi.statsPrintFreq > 0) { int failedNodesSize; int leavingNodesSize; + int joiningNodesSize; + int pendingCustomMsgsSize; synchronized (mux) { failedNodesSize = failedNodes.size(); leavingNodesSize = leavingNodes.size(); + joiningNodesSize = joiningNodes.size(); + pendingCustomMsgsSize = pendingCustomMsgs.size(); } Runtime runtime = Runtime.getRuntime(); @@ -1422,8 +1434,13 @@ class ServerImpl extends TcpDiscoveryImpl { log.info("Discovery SPI statistics [statistics=" + spi.stats + ", spiState=" + spiStateCopy() + ", coord=" + coord + + ", next=" + (msgWorker != null ? msgWorker.next : "N/A") + + ", intOrder=" + (locNode != null ? locNode.internalOrder() : "N/A") + ", topSize=" + ring.allNodes().size() + - ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize + + ", leavingNodesSize=" + leavingNodesSize + + ", failedNodesSize=" + failedNodesSize + + ", joiningNodesSize=" + joiningNodesSize + + ", pendingCustomMsgs=" + pendingCustomMsgsSize + ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() : "N/A") + ", clients=" + ring.clientNodes().size() + ", clientWorkers=" + clientMsgWorkers.size() + @@ -1612,7 +1629,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("In-memory log messages: ").append(U.nl()); - for (String msg : debugLog) + for (String msg : debugLogQ) b.append(" ").append(msg).append(U.nl()); b.append(U.nl()); @@ -2177,12 +2194,6 @@ class ServerImpl extends TcpDiscoveryImpl { /** Connection check threshold. */ private long connCheckThreshold; - /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */ - private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>(); - - /** Collection to track joining nodes. */ - private Set<UUID> joiningNodes = new HashSet<>(); - /** */ protected RingMessageWorker() { @@ -2197,6 +2208,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to add. */ void addMessage(TcpDiscoveryAbstractMessage msg) { + DebugLogger log = messageLogger(msg); + if ((msg instanceof TcpDiscoveryStatusCheckMessage || msg instanceof TcpDiscoveryJoinRequestMessage || msg instanceof TcpDiscoveryCustomEventMessage || @@ -2278,6 +2291,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + DebugLogger log = messageLogger(msg); + if (log.isDebugEnabled()) log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); @@ -2285,12 +2300,12 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']'); if (locNode.internalOrder() == 0) { - boolean process = false; + boolean proc = false; if (msg instanceof TcpDiscoveryNodeAddedMessage) - process = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); + proc = ((TcpDiscoveryNodeAddedMessage)msg).node().equals(locNode); - if (!process) { + if (!proc) { if (log.isDebugEnabled()) { log.debug("Ignore message, local node order is not initialized [msg=" + msg + ", locNode=" + locNode + ']'); @@ -2488,8 +2503,8 @@ class ServerImpl extends TcpDiscoveryImpl { newNextNode = true; } - else if (log.isDebugEnabled()) - log.debug("Next node remains the same [nextId=" + next.id() + + else if (log.isTraceEnabled()) + log.trace("Next node remains the same [nextId=" + next.id() + ", nextOrder=" + next.internalOrder() + ']'); // Flag that shows whether next node exists and accepts incoming connections. @@ -2752,8 +2767,10 @@ class ServerImpl extends TcpDiscoveryImpl { onMessageExchanged(); - if (log.isDebugEnabled()) { - log.debug("Message has been sent to next node [msg=" + msg + + DebugLogger debugLog = messageLogger(msg); + + if (debugLog.isDebugEnabled()) { + debugLog.debug("Message has been sent to next node [msg=" + msg + ", next=" + next.id() + ", res=" + res + ']'); } @@ -3783,7 +3800,9 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - joiningNodes.add(node.id()); + synchronized (mux) { + joiningNodes.add(node.id()); + } if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) { boolean authFailed = true; @@ -3895,7 +3914,9 @@ class ServerImpl extends TcpDiscoveryImpl { n.visible(true); } - joiningNodes.clear(); + synchronized (mux) { + joiningNodes.clear(); + } locNode.setAttributes(node.attributes()); @@ -4021,7 +4042,9 @@ class ServerImpl extends TcpDiscoveryImpl { } } - joiningNodes.remove(nodeId); + synchronized (mux) { + joiningNodes.remove(nodeId); + } TcpDiscoverySpiState state = spiStateCopy(); @@ -4240,7 +4263,9 @@ class ServerImpl extends TcpDiscoveryImpl { } } - joiningNodes.remove(leftNode.id()); + synchronized (mux) { + joiningNodes.remove(leftNode.id()); + } spi.stats.onNodeLeft(); @@ -4418,7 +4443,9 @@ class ServerImpl extends TcpDiscoveryImpl { ", msg=" + msg.warning() + ']'); } - joiningNodes.remove(node.id()); + synchronized (mux) { + joiningNodes.remove(node.id()); + } notifyDiscovery(EVT_NODE_FAILED, topVer, node); @@ -4619,8 +4646,8 @@ class ServerImpl extends TcpDiscoveryImpl { } if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) { - if (log.isDebugEnabled()) - log.debug("Discarding heartbeat message that has made two passes: " + msg); + if (log.isTraceEnabled()) + log.trace("Discarding heartbeat message that has made two passes: " + msg); return; } @@ -4821,18 +4848,28 @@ class ServerImpl extends TcpDiscoveryImpl { assert ring.minimumNodeVersion() != null : ring; + boolean joiningEmpty; + + synchronized (mux) { + joiningEmpty = joiningNodes.isEmpty(); + } + if (ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) - delayMsg = msg.topologyVersion() == 0L && !joiningNodes.isEmpty(); + delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; else - delayMsg = !joiningNodes.isEmpty(); + delayMsg = !joiningEmpty; if (delayMsg) { if (log.isDebugEnabled()) { - log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + - ", joiningNodes=" + joiningNodes + ']'); + synchronized (mux) { + log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + + ", joiningNodes=" + joiningNodes + ']'); + } } - pendingCustomMsgs.add(msg); + synchronized (mux) { + pendingCustomMsgs.add(msg); + } return; } @@ -4973,10 +5010,16 @@ class ServerImpl extends TcpDiscoveryImpl { * Checks and flushes custom event messages if no nodes are attempting to join the grid. */ private void checkPendingCustomMessages() { - if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) { + boolean joiningEmpty; + + synchronized (mux) { + joiningEmpty = joiningNodes.isEmpty(); + } + + if (joiningEmpty && isLocalNodeCoordinator()) { TcpDiscoveryCustomEventMessage msg; - while ((msg = pendingCustomMsgs.poll()) != null) { + while ((msg = pollPendingCustomeMessage()) != null) { processCustomMessage(msg); if (msg.verified()) @@ -4986,6 +5029,15 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @return Pending custom message. + */ + @Nullable private TcpDiscoveryCustomEventMessage pollPendingCustomeMessage() { + synchronized (mux) { + return pendingCustomMsgs.poll(); + } + } + + /** * @param msg Custom message. */ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) { @@ -5134,8 +5186,12 @@ class ServerImpl extends TcpDiscoveryImpl { else srvrSock = new ServerSocket(port, 0, spi.locHost); - if (log.isInfoEnabled()) - log.info("Successfully bound to TCP port [port=" + port + ", localHost=" + spi.locHost + ']'); + if (log.isInfoEnabled()) { + log.info("Successfully bound to TCP port [port=" + port + + ", localHost=" + spi.locHost + + ", locNodeId=" + spi.ignite().configuration().getNodeId() + + ']'); + } return; } @@ -5450,7 +5506,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); while (!isInterrupted()) { @@ -5460,8 +5516,10 @@ class ServerImpl extends TcpDiscoveryImpl { msg.senderNodeId(nodeId); - if (log.isDebugEnabled()) - log.debug("Message has been received: " + msg); + DebugLogger debugLog = messageLogger(msg); + + if (debugLog.isDebugEnabled()) + debugLog.debug("Message has been received: " + msg); spi.stats.onMessageReceived(msg); @@ -5469,7 +5527,7 @@ class ServerImpl extends TcpDiscoveryImpl { debugLog(msg, "Message has been received: " + msg); if (msg instanceof TcpDiscoveryConnectionCheckMessage) { - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); continue; } @@ -5491,7 +5549,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); if (clientMsgWrk.getState() == State.NEW) clientMsgWrk.start(); @@ -5501,7 +5559,7 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } else { - spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, socketTimeout); + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout); break; } @@ -5509,7 +5567,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); boolean ignored = false; @@ -5538,7 +5596,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); boolean ignored = false; @@ -5567,7 +5625,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); boolean ignored = false; @@ -5596,7 +5654,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { // Send receipt back. - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); boolean ignored = false; @@ -5650,7 +5708,7 @@ class ServerImpl extends TcpDiscoveryImpl { clientMsgWrk.addMessage(ack); } else - spi.writeToSocket(msg, sock, RES_OK, socketTimeout); + spi.writeToSocket(msg, sock, RES_OK, sockTimeout); if (heartbeatMsg != null) processClientHeartbeatMessage(heartbeatMsg); @@ -5914,7 +5972,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Message. */ - public void addMessage(TcpDiscoveryAbstractMessage msg) { + void addMessage(TcpDiscoveryAbstractMessage msg) { addMessage(msg, null); } @@ -5922,7 +5980,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message. * @param msgBytes Optional message bytes. */ - public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { + void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes) { T2 t = new T2<>(msg, msgBytes); if (msg.highPriority()) @@ -5930,6 +5988,8 @@ class ServerImpl extends TcpDiscoveryImpl { else queue.add(t); + DebugLogger log = messageLogger(msg); + if (log.isDebugEnabled()) log.debug("Message has been added to client queue: " + msg); } @@ -5948,22 +6008,24 @@ class ServerImpl extends TcpDiscoveryImpl { if (msgBytes == null) msgBytes = U.marshal(spi.marshaller(), msg); + DebugLogger msgLog = messageLogger(msg); + if (msg instanceof TcpDiscoveryClientAckResponse) { if (clientVer == null) { ClusterNode node = spi.getNode(clientNodeId); if (node != null) clientVer = IgniteUtils.productVersion(node); - else if (log.isDebugEnabled()) - log.debug("Skip sending message ack to client, fail to get client node " + + else if (msgLog.isDebugEnabled()) + msgLog.debug("Skip sending message ack to client, fail to get client node " + "[sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); } if (clientVer != null && clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) { - if (log.isDebugEnabled()) - log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + if (msgLog.isDebugEnabled()) + msgLog.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? @@ -5971,8 +6033,8 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { - if (log.isDebugEnabled()) - log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + if (msgLog.isDebugEnabled()) + msgLog.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); assert topologyInitialized(msg) : msg; http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 30b83e5..0816cbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -70,7 +70,33 @@ abstract class TcpDiscoveryImpl { /** Received messages. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - protected ConcurrentLinkedDeque<String> debugLog; + protected ConcurrentLinkedDeque<String> debugLogQ; + + /** */ + protected final ServerImpl.DebugLogger debugLog = new DebugLogger() { + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return log.isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + log.debug(msg); + } + }; + + /** */ + protected final ServerImpl.DebugLogger traceLog = new DebugLogger() { + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return log.isTraceEnabled(); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + log.trace(msg); + } + }; /** * @param spi Adapter. @@ -111,12 +137,12 @@ abstract class TcpDiscoveryImpl { "-" + locNode.internalOrder() + "] " + msg; - debugLog.add(msg0); + debugLogQ.add(msg0); - int delta = debugLog.size() - debugMsgHist; + int delta = debugLogQ.size() - debugMsgHist; - for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++) - debugLog.poll(); + for (int i = 0; i < delta && debugLogQ.size() > debugMsgHist; i++) + debugLogQ.poll(); } /** @@ -326,4 +352,27 @@ abstract class TcpDiscoveryImpl { return res; } + + /** + * @param msg Message. + * @return Message logger. + */ + protected final DebugLogger messageLogger(TcpDiscoveryAbstractMessage msg) { + return msg.traceLogLevel() ? traceLog : debugLog; + } + + /** + * + */ + interface DebugLogger { + /** + * @return {@code True} if debug logging is enabled. + */ + boolean isDebugEnabled(); + + /** + * @param msg Message to log. + */ + void debug(String msg); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 24f2a5a..39170ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -99,6 +99,13 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** + * @return + */ + public boolean traceLogLevel() { + return false; + } + + /** * Gets creator node. * * @return Creator node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java index 6505765..0a656d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java @@ -53,6 +53,11 @@ public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage { } /** {@inheritDoc} */ + @Override public boolean traceLogLevel() { + return true; + } + + /** {@inheritDoc} */ @Override public boolean highPriority() { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java index 3993de0..ade5468 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * Heartbeat message. * <p> - * Client sends his hearbeats in this message. + * Client sends his heartbeats in this message. */ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage { /** */ @@ -61,6 +61,11 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess } /** {@inheritDoc} */ + @Override public boolean traceLogLevel() { + return true; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java index a152936..7793a3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java @@ -50,6 +50,11 @@ public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMess } /** {@inheritDoc} */ + @Override public boolean traceLogLevel() { + return true; + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { // This method has been left empty intentionally to keep message size at min. } http://git-wip-us.apache.org/repos/asf/ignite/blob/6b78ad0c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 338e3f5..0ae253a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -214,6 +214,11 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { } /** {@inheritDoc} */ + @Override public boolean traceLogLevel() { + return true; + } + + /** {@inheritDoc} */ @Override public boolean highPriority() { return true; }
