Improved exchange timeout debug logging.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1116069 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1116069 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1116069 Branch: refs/heads/ignite-5398 Commit: b1116069549be224d59983b93d2ee22cab8402b8 Parents: c35dbf4 Author: sboikov <[email protected]> Authored: Tue May 16 11:24:11 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 16 11:24:11 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 6 + .../org/apache/ignite/internal/GridTopic.java | 6 +- .../ignite/internal/IgniteDiagnosticAware.java | 25 + .../internal/IgniteDiagnosticMessage.java | 490 +++++++++++++++++++ .../apache/ignite/internal/IgniteKernal.java | 2 + .../communication/GridIoMessageFactory.java | 5 + .../cache/DynamicCacheChangeBatch.java | 14 +- .../cache/DynamicCacheChangeRequest.java | 11 +- .../processors/cache/GridCacheIoManager.java | 32 ++ .../GridCachePartitionExchangeManager.java | 36 +- .../processors/cache/GridCacheProcessor.java | 2 - .../distributed/dht/GridDhtCacheEntry.java | 6 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 30 +- .../dht/GridPartitionedSingleGetFuture.java | 24 +- .../colocated/GridDhtColocatedLockFuture.java | 44 +- .../GridDhtPartitionsAbstractMessage.java | 5 + .../GridDhtPartitionsExchangeFuture.java | 49 +- .../processors/cluster/ClusterProcessor.java | 355 ++++++++++++++ .../ignite/internal/util/nio/GridNioServer.java | 228 +++++++-- .../communication/tcp/TcpCommunicationSpi.java | 157 +++--- ...agnosticMessagesMultipleConnectionsTest.java | 35 ++ .../managers/IgniteDiagnosticMessagesTest.java | 152 ++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 4 + 23 files changed, 1568 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index ce2666b..1388f49 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -420,6 +420,12 @@ public final class IgniteSystemProperties { /** If this property is set to {@code true} then Ignite will log thread dump in case of partition exchange timeout. */ public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = "IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT"; + /** */ + public static final String IGNITE_IO_DUMP_ON_TIMEOUT = "IGNITE_IO_DUMP_ON_TIMEOUT"; + + /** */ + public static final String IGNITE_DIAGNOSTIC_ENABLED = "IGNITE_DIAGNOSTIC_ENABLED"; + /** Cache operations that take more time than value of this property will be output to log. Set to {@code 0} to disable. */ public static final String IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT = "IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT"; http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 86245a8..75759ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -96,13 +96,17 @@ public enum GridTopic { /** */ TOPIC_TX, + /** */ TOPIC_SNAPSHOT, /** */ TOPIC_IO_TEST, /** */ - TOPIC_HADOOP_MSG; + TOPIC_HADOOP_MSG, + + /** */ + TOPIC_INTERNAL_DIAGNOSTIC; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java new file mode 100644 index 0000000..f33f678 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +/** + * + */ +public interface IgniteDiagnosticAware { + public void dumpDiagnosticInfo(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java new file mode 100644 index 0000000..8be571e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * + */ +public class IgniteDiagnosticMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final ThreadLocal<DateFormat> dateFormat = new ThreadLocal<DateFormat>() { + @Override protected DateFormat initialValue() { + return new SimpleDateFormat("HH:mm:ss.SSS"); + } + }; + + /** */ + private long futId; + + /** */ + private String msg; + + /** */ + private byte[] cBytes; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public IgniteDiagnosticMessage() { + // No-op. + } + + /** + * @param ctx Context. + * @param c Closure to run. + * @param futId Future ID. + * @return Request message. + * @throws IgniteCheckedException If failed. + */ + public static IgniteDiagnosticMessage createRequest(GridKernalContext ctx, + IgniteClosure<GridKernalContext, String> c, + long futId) + throws IgniteCheckedException + { + byte[] cBytes = U.marshal(ctx.config().getMarshaller(), c); + + IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage(); + + msg.futId = futId; + msg.cBytes = cBytes; + + return msg; + } + + /** + * @param msg0 Message. + * @param futId Future ID. + * @return Response message. + */ + public static IgniteDiagnosticMessage createResponse(String msg0, long futId) { + IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage(); + + msg.futId = futId; + msg.msg = msg0; + + return msg; + } + + /** + * @param ctx Context. + * @return Unmarshalled closure. + * @throws IgniteCheckedException If failed. + */ + public IgniteClosure<GridKernalContext, String> unmarshalClosure(GridKernalContext ctx) + throws IgniteCheckedException { + assert cBytes != null; + + return U.unmarshal(ctx, cBytes, null); + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return {@code True} if this is request message. + */ + public boolean request() { + return cBytes != null; + } + + /** + * @return Message string. + */ + public String message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("cBytes", cBytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeString("msg", msg)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cBytes = reader.readByteArray("cBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + msg = reader.readString("msg"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(IgniteDiagnosticMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -46; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** + * + */ + public static class BaseClosure implements IgniteClosure<GridKernalContext, String> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected final UUID nodeId; + + /** + * @param ctx Local node context. + */ + public BaseClosure(GridKernalContext ctx) { + this.nodeId = ctx.localNodeId(); + } + + /** {@inheritDoc} */ + @Override public final String apply(GridKernalContext ctx) { + try { + StringBuilder sb = new StringBuilder(); + + IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx, nodeId); + + sb.append(dumpNodeBasicInfo(ctx)); + + sb.append(U.nl()).append(dumpExchangeInfo(ctx)); + + String moreInfo = dumpInfo(ctx); + + sb.append(U.nl()).append(commInfo.get()); + + if (moreInfo != null) + sb.append(U.nl()).append(moreInfo); + + return sb.toString(); + } + catch (Exception e) { + ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message closure: " + e, e); + + return "Failed to execute diagnostic message closure: " + e; + } + } + + /** + * @param ctx Context. + * @return Message. + */ + protected String dumpInfo(GridKernalContext ctx) { + return null; + } + } + + /** + * + */ + public static class TxEntriesInfoClosure extends BaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final int cacheId; + + /** */ + private final Collection<KeyCacheObject> keys; + + /** + * @param ctx Context. + * @param cacheId Cache ID. + * @param keys Keys. + */ + public TxEntriesInfoClosure(GridKernalContext ctx, int cacheId, Collection<KeyCacheObject> keys) { + super(ctx); + + this.cacheId = cacheId; + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override protected String dumpInfo(GridKernalContext ctx) { + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); + + if (cctx == null) + return "Failed to find cache with id: " + cacheId; + + try { + for (KeyCacheObject key : keys) + key.finishUnmarshal(cctx.cacheObjectContext(), null); + } + catch (IgniteCheckedException e) { + ctx.cluster().diagnosticLog().error("Failed to unmarshal key: " + e, e); + + return "Failed to unmarshal key: " + e; + } + + StringBuilder sb = new StringBuilder("Cache entries [cacheId=" + cacheId + ", cacheName=" + cctx.name() + "]: "); + + for (KeyCacheObject key : keys) { + sb.append(U.nl()); + + GridCacheMapEntry e = (GridCacheMapEntry)cctx.cache().peekEx(key); + + sb.append("Key [key=").append(key).append(", entry=").append(e).append("]"); + } + + return sb.toString(); + } + } + + /** + * + */ + public static class ExchangeInfoClosure extends BaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param ctx Context. + * @param topVer Exchange version. + */ + public ExchangeInfoClosure(GridKernalContext ctx, AffinityTopologyVersion topVer) { + super(ctx); + + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override protected String dumpInfo(GridKernalContext ctx) { + List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures(); + + for (GridDhtPartitionsExchangeFuture fut : futs) { + if (topVer.equals(fut.topologyVersion())) + return "Exchange future: " + fut; + } + + return "Failed to find exchange future: " + topVer; + } + } + + /** + * + */ + public static class TxInfoClosure extends BaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridCacheVersion dhtVer; + + /** */ + private final GridCacheVersion nearVer; + + /** + * @param ctx Context. + * @param dhtVer Tx dht version. + * @param nearVer Tx near version. + */ + public TxInfoClosure(GridKernalContext ctx, + GridCacheVersion dhtVer, + GridCacheVersion nearVer) { + super(ctx); + + this.dhtVer = dhtVer; + this.nearVer = nearVer; + } + + /** {@inheritDoc} */ + @Override protected String dumpInfo(GridKernalContext ctx) { + StringBuilder b = new StringBuilder(); + + b.append("Related transactions [dhtVer=").append(dhtVer). + append(", nearVer=").append(nearVer).append("]: "); + + boolean found = false; + + for (IgniteInternalTx tx : ctx.cache().context().tm().activeTransactions()) { + if (dhtVer.equals(tx.xidVersion()) || nearVer.equals(tx.nearXidVersion())) { + found = true; + + b.append(U.nl()); + b.append("Found related ttx [ver=").append(tx.xidVersion()). + append(", nearVer=").append(tx.nearXidVersion()). + append(", topVer=").append(tx.topologyVersion()). + append(", state=").append(tx.state()). + append(", fullTx=").append(tx). + append("]"); + } + } + + if (!found) { + b.append(U.nl()); + b.append("Failed to find related transactions."); + } + + return b.toString(); + } + } + + /** + * @param ctx Context. + * @return Node information string. + */ + static String dumpNodeBasicInfo(GridKernalContext ctx) { + StringBuilder sb = new StringBuilder("General node info [id=").append(ctx.localNodeId()); + + sb.append(", client=").append(ctx.clientNode()); + sb.append(", discoTopVer=").append(ctx.discovery().topologyVersionEx()); + sb.append(", time=").append(formatTime(U.currentTimeMillis())); + + sb.append(']'); + + return sb.toString(); + } + + /** + * @param ctx Context. + * @return Exchange information string. + */ + static String dumpExchangeInfo(GridKernalContext ctx) { + GridCachePartitionExchangeManager exchMgr = ctx.cache().context().exchange(); + + StringBuilder sb = new StringBuilder("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion()); + sb.append("]"); + + GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture(); + + sb.append(U.nl()).append("Last initialized exchange future: ").append(fut); + + return sb.toString(); + } + + /** + * @param ctx Context. + * @param nodeId Target node ID. + * @return Communication information future. + */ + public static IgniteInternalFuture<String> dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) { + if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi) + return ((TcpCommunicationSpi) ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId); + else + return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi()); + } + /** + * @param time Time. + * @return Time string. + */ + private static String formatTime(long time) { + return dateFormat.get().format(new Date(time)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDiagnosticMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 1445443..a83d888 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -911,6 +911,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { provider.start(ctx.plugins().pluginContextForProvider(provider)); } + ctx.cluster().initListeners(); + fillNodeAttributes(clusterProc.updateNotifierEnabled()); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6f73682..f443f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest; import org.apache.ignite.internal.GridJobSiblingsResponse; import org.apache.ignite.internal.GridTaskCancelRequest; import org.apache.ignite.internal.GridTaskSessionRequest; +import org.apache.ignite.internal.IgniteDiagnosticMessage; import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; @@ -172,6 +173,10 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -46: + msg = new IgniteDiagnosticMessage(); + + break; case -45: msg = new GridChangeGlobalStateMessageResponse(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 0e4373c..f423002 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -21,8 +21,11 @@ import java.util.Collection; import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -38,7 +41,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { private Collection<DynamicCacheChangeRequest> reqs; /** Client nodes map. Used in discovery data exchange. */ - @GridToStringInclude + @GridToStringExclude private Map<String, Map<UUID, Boolean>> clientNodes; /** Custom message ID. */ @@ -132,6 +135,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(DynamicCacheChangeBatch.class, this); + Object clients = F.viewReadOnly(clientNodes, new IgniteBiClosure<String, Map<UUID,Boolean>, Object>() { + @Override public Object apply(String s, Map<UUID, Boolean> map) { + return map != null ? map.keySet() : null; + } + } + ); + + return S.toString(DynamicCacheChangeBatch.class, this, "clientNodes", clients); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index ad7c7a5..f0ac505 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -45,6 +44,7 @@ public class DynamicCacheChangeRequest implements Serializable { private String cacheName; /** Cache start configuration. */ + @GridToStringExclude private CacheConfiguration startCfg; /** Cache type. */ @@ -54,6 +54,7 @@ public class DynamicCacheChangeRequest implements Serializable { private UUID initiatingNodeId; /** Near cache configuration. */ + @GridToStringExclude private NearCacheConfiguration nearCacheCfg; /** Start only client cache, do not start data nodes. */ @@ -372,6 +373,12 @@ public class DynamicCacheChangeRequest implements Serializable { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); + return "DynamicCacheChangeRequest [cacheName=" + cacheName() + + ", hasCfg=" + (startCfg != null) + + ", nodeId=" + initiatingNodeId + + ", clientStartOnly=" + clientStartOnly + + ", close=" + close + + ", stop=" + stop + + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 924ce79..277d176 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -17,12 +17,15 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -125,6 +128,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Deployment enabled. */ private boolean depEnabled; + /** */ + private final List<GridCacheMessage> pendingMsgs = new ArrayList<>(); + + /** + * + */ + public void dumpPendingMessages() { + synchronized (pendingMsgs) { + if (pendingMsgs.isEmpty()) + return; + + log.info("Pending cache messages waiting for exchange [" + + "readyVer=" + cctx.exchange().readyAffinityVersion() + + ", discoVer=" + cctx.discovery().topologyVersion() + ']'); + + for (GridCacheMessage msg : pendingMsgs) + log.info("Message [waitVer=" + msg.topologyVersion() + ", msg=" + msg + ']'); + } + } + /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { @Override public void onMessage(final UUID nodeId, final Object msg) { @@ -211,10 +234,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } if (fut != null && !fut.isDone()) { + synchronized (pendingMsgs) { + if (pendingMsgs.size() < 100) + pendingMsgs.add(cacheMsg); + } + fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { + synchronized (pendingMsgs) { + pendingMsgs.remove(cacheMsg); + } + IgniteLogger log = cacheMsg.messageLogger(cctx); if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 95cb452..02da4fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -49,13 +49,13 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; @@ -1383,7 +1383,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Pending exchange futures:"); for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ) - U.warn(log, ">>> " + fut); + U.warn(log, ">>> " + fut.shortInfo()); if (!readyFuts.isEmpty()) { U.warn(log, "Pending affinity ready futures:"); @@ -1400,7 +1400,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana int cnt = 0; for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) { - U.warn(log, ">>> " + fut); + U.warn(log, ">>> " + fut.shortInfo()); if (++cnt == 10) break; @@ -1414,8 +1414,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.affinity().dumpDebugInfo(); + cctx.io().dumpPendingMessages(); + // Dump IO manager statistics. - cctx.gridIO().dumpStats(); + if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT, false)) + cctx.gridIO().dumpStats(); } /** @@ -1460,6 +1463,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + + if (fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo(); } else break; @@ -1473,6 +1479,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + + if (fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo(); } else break; @@ -1480,6 +1489,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + cctx.io().dumpPendingMessages(); + if (found) { if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { longRunningOpsDumpCnt++; @@ -1493,7 +1504,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Found long running cache operations, dump IO statistics."); // Dump IO manager statistics. - cctx.gridIO().dumpStats(); + if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT, false)) + cctx.gridIO().dumpStats(); } } else @@ -1542,14 +1554,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Pending cache futures:"); - for (GridCacheFuture<?> fut : mvcc.activeFutures()) + for (GridCacheFuture<?> fut : mvcc.activeFutures()) { U.warn(log, ">>> " + fut); + if (fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo(); + } + U.warn(log, "Pending atomic cache futures:"); - for (GridCacheFuture<?> fut : mvcc.atomicFutures()) + for (GridCacheFuture<?> fut : mvcc.atomicFutures()) { U.warn(log, ">>> " + fut); + if (fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo(); + } + U.warn(log, "Pending data streamer futures:"); for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) @@ -1727,6 +1747,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.dumpThreads(log); dumpedObjects++; + + exchFut.dumpDiagnosticInfo(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 8ad3c8d..bf7a4fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -148,8 +148,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.affinityNode; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.clientNode; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index f1f4376..56ad1e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -70,6 +71,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { private volatile ReaderId[] rdrs = ReaderId.EMPTY_ARRAY; /** Local partition. */ + @GridToStringExclude private final GridDhtLocalPartition locPart; /** @@ -720,7 +722,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { /** {@inheritDoc} */ @Override public synchronized String toString() { - return S.toString(GridDhtCacheEntry.class, this, "super", super.toString()); + return S.toString(GridDhtCacheEntry.class, this, + "part", locPart.id(), + "super", super.toString()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index a17b782..f889dc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -100,7 +101,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; */ @SuppressWarnings("unchecked") public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse> - implements GridCacheMvccFuture<GridNearTxPrepareResponse> { + implements GridCacheMvccFuture<GridNearTxPrepareResponse>, IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -1579,6 +1580,33 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** {@inheritDoc} */ + @Override public void dumpDiagnosticInfo() { + if (!isDone()) { + for (IgniteInternalFuture fut : futures()) { + if (!fut.isDone() && fut instanceof MiniFuture) { + MiniFuture f = (MiniFuture)fut; + + if (!f.node().isLocal()) { + GridCacheVersion dhtVer = tx.xidVersion(); + GridCacheVersion nearVer = tx.nearXidVersion(); + + cctx.kernalContext().cluster().dumpRemoteTxInfo(f.nodeId, dhtVer, nearVer, "GridDhtTxPrepareFuture " + + "waiting for response [node=" + f.nodeId + + ", topVer=" + tx.topologyVersion() + + ", dhtVer=" + dhtVer + + ", nearVer=" + nearVer + + ", futId=" + futId + + ", miniId=" + f.futId + + ", tx=" + tx + ']'); + + return; + } + } + } + } + } + + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 6e438ed..08d2084 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -64,7 +65,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * */ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>, - CacheGetFuture { + CacheGetFuture, IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -760,6 +761,27 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } /** {@inheritDoc} */ + @Override public void dumpDiagnosticInfo() { + if (!isDone()) { + UUID nodeId; + AffinityTopologyVersion topVer; + + synchronized (this) { + nodeId = node != null ? node.id() : null; + topVer = this.topVer; + } + + if (nodeId != null) + cctx.kernalContext().cluster().dumpBasicInfo(nodeId, "GridPartitionedSingleGetFuture waiting for " + + "response [node=" + nodeId + + ", key=" + key + + ", futId=" + futId + + ", topVer=" + topVer + ']', + null); + } + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridPartitionedSingleGetFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index a1847d2..00bcd10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -84,7 +85,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; * Colocated cache lock future. */ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> - implements GridCacheMvccFuture<Boolean> { + implements GridCacheMvccFuture<Boolean>, IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -589,13 +590,51 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** {@inheritDoc} */ + @Override public void dumpDiagnosticInfo() { + if (!isDone()) { + for (IgniteInternalFuture fut : futures()) { + if (!fut.isDone() && isMini(fut)) { + MiniFuture m = (MiniFuture)fut; + + AffinityTopologyVersion topVer = null; + UUID rmtNodeId = null; + + synchronized (m) { + if (!m.rcvRes && !m.node.isLocal()) { + rmtNodeId = m.node.id(); + + topVer = this.topVer; + } + } + + if (rmtNodeId != null) { + cctx.kernalContext().cluster().dumpTxKeyInfo(rmtNodeId, cctx.cacheId(), m.keys, + "GridDhtColocatedLockFuture waiting for response [node=" + rmtNodeId + + ", cache=" + cctx.name() + + ", miniId=" + m.futId + + ", topVer=" + topVer + + ", keys=" + m.keys + ']'); + + return; + } + } + } + } + } + + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { if (isMini(f)) { MiniFuture m = (MiniFuture)f; - return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; + synchronized (m) { + return "[node=" + m.node().id() + + ", rcvRes=" + m.rcvRes + + ", loc=" + m.node().isLocal() + + ", done=" + f.isDone() + "]"; + } } else return "[loc=true, done=" + f.isDone() + "]"; @@ -603,6 +642,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture }); return S.toString(GridDhtColocatedLockFuture.class, this, + "topVer", topVer, "innerFuts", futs, "inTx", inTx(), "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index f1e2c01..70784a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -68,6 +68,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage } /** {@inheritDoc} */ + @Override public int partition() { + return Integer.MIN_VALUE; + } + + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 28c3956..4e04156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -107,7 +108,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS */ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, IgniteDiagnosticAware { /** */ public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); @@ -230,6 +231,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap8<>(); /** */ + @GridToStringExclude private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); /** Forced Rebalance future. */ @@ -239,6 +241,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT private volatile Map<Integer, Map<Integer, Long>> partHistReserved; /** */ + @GridToStringExclude private volatile IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap(); /** @@ -1352,10 +1355,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } if (super.onDone(res, err) && realExchange) { - exchLog.info("exchange finished [topVer=" + topologyVersion() + - ", time1=" + duration() + - ", time2=" + (U.currentTimeMillis() - initTs) + ']'); - if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); @@ -2324,13 +2323,49 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** {@inheritDoc} */ + @Override public void dumpDiagnosticInfo() { + if (!isDone()) { + ClusterNode crd; + Set<UUID> remaining; + + synchronized (mux) { + crd = this.crd; + remaining = new HashSet<>(this.remaining); + } + + if (crd != null) { + if (!crd.isLocal()) { + cctx.kernalContext().cluster().dumpExchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting for coordinator " + + "response [crd=" + crd.id() + ", topVer=" + topologyVersion() + ']'); + } + else if (!remaining.isEmpty()){ + UUID nodeId = remaining.iterator().next(); + + cctx.kernalContext().cluster().dumpExchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting for server " + + "response [node=" + nodeId + ", topVer=" + topologyVersion() + ']'); + } + } + } + } + + /** + * @return Short information string. + */ + public String shortInfo() { + return "GridDhtPartitionsExchangeFuture [topVer=" + topologyVersion() + + ", evt=" + (discoEvt != null ? discoEvt.type() : -1) + + ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) + + ", done=" + isDone() + ']'; + } + + /** {@inheritDoc} */ @Override public String toString() { Set<UUID> remaining; - List<ClusterNode> srvNodes; + int srvNodes; synchronized (mux) { remaining = new HashSet<>(this.remaining); - srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null; + srvNodes = this.srvNodes != null ? this.srvNodes.size() : 0; } return S.toString(GridDhtPartitionsExchangeFuture.class, this, http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 124cb4b..317b274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -19,26 +19,52 @@ package org.apache.ignite.internal.processors.cluster; import java.io.Serializable; import java.lang.ref.WeakReference; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteDiagnosticMessage; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteProperties; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.IgniteClusterImpl; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridTimerTask; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR; /** @@ -46,6 +72,13 @@ import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR; */ public class ClusterProcessor extends GridProcessorAdapter { /** */ + private final boolean DIAGNOSTIC_ENABLED = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED, false); + + /** */ + private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic"; + + /** */ private static final String ATTR_UPDATE_NOTIFIER_STATUS = "UPDATE_NOTIFIER_STATUS"; /** Periodic version check delay. */ @@ -68,6 +101,16 @@ public class ClusterProcessor extends GridProcessorAdapter { @GridToStringExclude private GridUpdateNotifier verChecker; + /** */ + private final IgniteLogger diagnosticLog; + + /** */ + private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap = + new AtomicReference<>(); + + /** */ + private final AtomicLong diagFutId = new AtomicLong(); + /** * @param ctx Kernal context. */ @@ -78,6 +121,102 @@ public class ClusterProcessor extends GridProcessorAdapter { Boolean.parseBoolean(IgniteProperties.get("ignite.update.notifier.enabled.by.default")))); cluster = new IgniteClusterImpl(ctx); + + diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void initListeners() throws IgniteCheckedException { + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); + + if (futs != null) { + for (InternalDiagnosticFuture fut : futs.values()) { + if (fut.nodeId.equals(nodeId)) + fut.onDone("Target node failed: " + nodeId); + } + } + } + }, + EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof IgniteDiagnosticMessage) { + IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg; + + if (msg0.request()) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (diagnosticLog.isDebugEnabled()) { + diagnosticLog.debug("Skip diagnostic request, sender node left " + + "[node=" + nodeId + ", msg=" + msg + ']'); + } + + return; + } + + String resMsg; + + IgniteClosure<GridKernalContext, String> c; + + try { + c = msg0.unmarshalClosure(ctx); + + resMsg = c.apply(ctx); + } + catch (Exception e) { + U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e); + + resMsg = "Failed to run diagnostic closure: " + e; + } + + IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(resMsg, msg0.futureId()); + + try { + ctx.io().send(node, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (diagnosticLog.isDebugEnabled()) { + diagnosticLog.debug("Failed to send diagnostic response, node left " + + "[node=" + nodeId + ", msg=" + msg + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e); + } + } + else { + InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId()); + + if (fut != null) + fut.onResponse(msg0); + else + U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']'); + } + } + else + U.warn(diagnosticLog, "Received unexpected message: " + msg); + } + }); + } + + /** + * @return Logger for diagnostic category. + */ + public IgniteLogger diagnosticLog() { + return diagnosticLog; } /** @@ -178,6 +317,180 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * @param nodeId Target node ID. + * @param dhtVer Tx dht version. + * @param nearVer Tx near version. + * @param msg Local message to log. + */ + public void dumpRemoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion nearVer, final String msg) { + if (!DIAGNOSTIC_ENABLED) + return; + + IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, + new IgniteDiagnosticMessage.TxInfoClosure(ctx, dhtVer, nearVer), + msg); + + listenAndLog(fut); + } + + /** + * @param nodeId Target node ID. + * @param cacheId Cache ID. + * @param keys Keys. + * @param msg Local message to log. + */ + public void dumpTxKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, final String msg) { + if (!DIAGNOSTIC_ENABLED) + return; + + IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.TxEntriesInfoClosure(ctx, cacheId, keys), msg); + + listenAndLog(fut); + } + + /** + * @param nodeId Target node ID. + * @param msg Local message to log. + */ + public void dumpBasicInfo(final UUID nodeId, final String msg, + @Nullable IgniteInClosure<IgniteInternalFuture<String>> lsnr) { + if (!DIAGNOSTIC_ENABLED) + return; + + IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.BaseClosure(ctx), msg); + + if (lsnr != null) + fut.listen(lsnr); + + listenAndLog(fut); + } + + /** + * @param nodeId Target node ID. + * @param topVer Exchange topology version. + * @param msg Local message to log. + */ + public void dumpExchangeInfo(final UUID nodeId, AffinityTopologyVersion topVer, final String msg) { + if (!DIAGNOSTIC_ENABLED) + return; + + IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new IgniteDiagnosticMessage.ExchangeInfoClosure(ctx, topVer), msg); + + listenAndLog(fut); + } + + /** + * @param fut Future. + */ + private void listenAndLog(IgniteInternalFuture<String> fut) { + fut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> msgFut) { + try { + String msg = msgFut.get(); + + diagnosticLog.info(msg); + } + catch (Exception e) { + U.error(diagnosticLog, "Failed to dump diagnostic info: " + e, e); + } + } + }); + } + + /** + * @param nodeId Target node ID. + * @param c Closure. + * @param baseMsg Local message to log. + * @return Message future. + */ + private IgniteInternalFuture<String> diagnosticInfo(final UUID nodeId, + IgniteClosure<GridKernalContext, String> c, + final String baseMsg) { + final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>(); + + final IgniteInternalFuture<String> rmtFut = sendDiagnosticMessage(nodeId, c); + + rmtFut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> fut) { + String rmtMsg; + + try { + rmtMsg = fut.get(); + } + catch (Exception e) { + rmtMsg = "Diagnostic processing error: " + e; + } + + final String rmtMsg0 = rmtMsg; + + IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId); + + locFut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> locFut) { + String locMsg; + + try { + locMsg = locFut.get(); + } + catch (Exception e) { + locMsg = "Failed to get info for local node: " + e; + } + + String sb = baseMsg + U.nl() + + "Remote node information:" + U.nl() + rmtMsg0 + + U.nl() + "Local communication statistics:" + U.nl() + + locMsg; + + infoFut.onDone(sb); + } + }); + } + }); + + return infoFut; + } + + /** + * @param nodeId Target node ID. + * @param c Message closure. + * @return Message future. + */ + private IgniteInternalFuture<String> sendDiagnosticMessage(UUID nodeId, IgniteClosure<GridKernalContext, String> c) { + try { + IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(ctx, + c, + diagFutId.getAndIncrement()); + + InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId()); + + diagnosticFuturesMap().put(msg.futureId(), fut); + + ctx.io().send(nodeId, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL); + + return fut; + } + catch (Exception e) { + U.error(log, "Failed to send diagnostic message: " + e); + + return new GridFinishedFuture<>("Failed to send diagnostic message: " + e); + } + } + + /** + * @return Diagnostic messages futures map. + */ + private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() { + ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get(); + + if (map == null) { + if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>())) + map = diagnosticFutMap.get(); + } + + return map; + } + + /** * Update notifier timer task. */ private static class UpdateNotifierTimerTask extends GridTimerTask { @@ -245,4 +558,46 @@ public class ClusterProcessor extends GridProcessorAdapter { } } } + + /** + * + */ + class InternalDiagnosticFuture extends GridFutureAdapter<String> { + /** */ + private final long id; + + /** */ + private final UUID nodeId; + + /** + * @param id Future ID. + */ + InternalDiagnosticFuture(UUID nodeId, long id) { + this.nodeId = nodeId; + this.id = id; + } + + /** + * @param msg Response message. + */ + public void onResponse(IgniteDiagnosticMessage msg) { + onDone(msg.message()); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable String res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + diagnosticFuturesMap().remove(id); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(InternalDiagnosticFuture.class, this); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index a59adba..cbba5da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -54,7 +54,9 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.ConnectorConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -701,6 +704,7 @@ public class GridNioServer<T> { /** * */ + @SuppressWarnings("ForLoopReplaceableByForEach") public void dumpStats() { U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); @@ -710,6 +714,51 @@ public class GridNioServer<T> { } /** + * @param msg Message to add. + * @param p Session predicate. + * @return Future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) { + GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() { + private final StringBuilder sb = new StringBuilder(msg); + + @Override public boolean collect(@Nullable String msg) { + if (!F.isEmpty(msg)) { + synchronized (sb) { + if (sb.length() > 0) + sb.append(U.nl()); + + sb.append(msg); + } + } + + return true; + } + + @Override public String reduce() { + synchronized (sb) { + return sb.toString(); + } + } + }); + + for (int i = 0; i < clientWorkers.size(); i++) { + NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS); + + opFut.msg = p; + + clientWorkers.get(i).offer(opFut); + + fut.add(opFut); + } + + fut.markInitialized(); + + return fut; + } + + /** * Establishes a session. * * @param ch Channel to register within the server and create session for. @@ -1509,12 +1558,15 @@ public class GridNioServer<T> { */ private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker { /** Queue of change requests on this selector. */ + @GridToStringExclude private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>(); /** Selector to select read events. */ + @GridToStringExclude private Selector selector; /** Selected keys. */ + @GridToStringExclude private SelectedSelectionKeySet selectedKeys; /** Worker index. */ @@ -1533,6 +1585,7 @@ public class GridNioServer<T> { private volatile long bytesSent0; /** Sessions assigned to this worker. */ + @GridToStringExclude private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions = new GridConcurrentHashSet<>(); @@ -1807,12 +1860,28 @@ public class GridNioServer<T> { case DUMP_STATS: { NioOperationFuture req = (NioOperationFuture)req0; - try { - dumpStats(); + if (req.msg instanceof IgnitePredicate) { + StringBuilder sb = new StringBuilder(); + + try { + dumpStats(sb, (IgnitePredicate<GridNioSession>)req.msg, true); + } + finally { + req.onDone(sb.toString()); + } } - finally { - // Complete the request just in case (none should wait on this future). - req.onDone(true); + else { + try { + StringBuilder sb = new StringBuilder(); + + dumpStats(sb, null, false); + + U.warn(log, sb.toString()); + } + finally { + // Complete the request just in case (none should wait on this future). + req.onDone(true); + } } } } @@ -1920,80 +1989,131 @@ public class GridNioServer<T> { } /** - * + * @param sb Message builder. + * @param keys Keys. */ - private void dumpStats() { - StringBuilder sb = new StringBuilder(); - - Set<SelectionKey> keys = selector.keys(); - - sb.append(U.nl()) - .append(">> Selector info [idx=").append(idx) + private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) { + sb.append(">> Selector info [idx=").append(idx) .append(", keysCnt=").append(keys.size()) .append(", bytesRcvd=").append(bytesRcvd) .append(", bytesRcvd0=").append(bytesRcvd0) .append(", bytesSent=").append(bytesSent) .append(", bytesSent0=").append(bytesSent0) .append("]").append(U.nl()); + } + + /** + * @param sb Message builder. + * @param p Optional session predicate. + * @param shortInfo Short info flag. + */ + private void dumpStats(StringBuilder sb, + @Nullable IgnitePredicate<GridNioSession> p, + boolean shortInfo) { + Set<SelectionKey> keys = selector.keys(); + + boolean selInfo = p == null; + + if (selInfo) + dumpSelectorInfo(sb, keys); for (SelectionKey key : keys) { GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + boolean sesInfo = p == null || p.apply(ses); - sb.append(" Connection info [") - .append("in=").append(ses.accepted()) - .append(", rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()); + if (sesInfo) { + if (!selInfo) { + dumpSelectorInfo(sb, keys); - GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); + selInfo = true; + } - if (outDesc != null) { - sb.append(", msgsSent=").append(outDesc.sent()) - .append(", msgsAckedByRmt=").append(outDesc.acked()) - .append(", descIdHash=").append(System.identityHashCode(outDesc)); - } - else - sb.append(", outRecoveryDesc=null"); + sb.append(" Connection info [") + .append("in=").append(ses.accepted()) + .append(", rmtAddr=").append(ses.remoteAddress()) + .append(", locAddr=").append(ses.localAddress()); - GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); - if (inDesc != null) { - sb.append(", msgsRcvd=").append(inDesc.received()) - .append(", lastAcked=").append(inDesc.lastAcknowledged()) - .append(", descIdHash=").append(System.identityHashCode(inDesc)); - } - else - sb.append(", inRecoveryDesc=null"); + if (outDesc != null) { + sb.append(", msgsSent=").append(outDesc.sent()) + .append(", msgsAckedByRmt=").append(outDesc.acked()) + .append(", descIdHash=").append(System.identityHashCode(outDesc)); + + if (!outDesc.messagesRequests().isEmpty()) { + int cnt = 0; + + sb.append(", unackedMsgs=["); + + for (SessionWriteRequest req : outDesc.messagesRequests()) { + if (cnt != 0) + sb.append(", "); + + Object msg = req.message(); - sb.append(", bytesRcvd=").append(ses.bytesReceived()) - .append(", bytesRcvd0=").append(ses.bytesReceived0()) - .append(", bytesSent=").append(ses.bytesSent()) - .append(", bytesSent0=").append(ses.bytesSent0()) - .append(", opQueueSize=").append(ses.writeQueueSize()) - .append(", msgWriter=").append(writer != null ? writer.toString() : "null") - .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + if (shortInfo && msg instanceof GridIoMessage) + msg = ((GridIoMessage)msg).message().getClass().getSimpleName(); - int cnt = 0; + sb.append(msg); - for (SessionWriteRequest req : ses.writeQueue()) { - if (cnt == 0) - sb.append(",\n opQueue=[").append(req); + if (++cnt == 5) + break; + } + + sb.append(']'); + } + } else - sb.append(',').append(req); + sb.append(", outRecoveryDesc=null"); - if (++cnt == 5) { - sb.append(']'); + GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor(); - break; + if (inDesc != null) { + sb.append(", msgsRcvd=").append(inDesc.received()) + .append(", lastAcked=").append(inDesc.lastAcknowledged()) + .append(", descIdHash=").append(System.identityHashCode(inDesc)); } - } + else + sb.append(", inRecoveryDesc=null"); - sb.append("]").append(U.nl()); - } + sb.append(", bytesRcvd=").append(ses.bytesReceived()) + .append(", bytesRcvd0=").append(ses.bytesReceived0()) + .append(", bytesSent=").append(ses.bytesSent()) + .append(", bytesSent0=").append(ses.bytesSent0()) + .append(", opQueueSize=").append(ses.writeQueueSize()); + + if (!shortInfo) { + MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); + MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); + + sb.append(", msgWriter=").append(writer != null ? writer.toString() : "null") + .append(", msgReader=").append(reader != null ? reader.toString() : "null"); + } + + int cnt = 0; + + for (SessionWriteRequest req : ses.writeQueue()) { + Object msg = req.message(); - U.warn(log, sb.toString()); + if (shortInfo && msg instanceof GridIoMessage) + msg = ((GridIoMessage)msg).message().getClass().getSimpleName(); + + if (cnt == 0) + sb.append(",\n opQueue=[").append(msg); + else + sb.append(',').append(msg); + + if (++cnt == 5) { + sb.append(']'); + + break; + } + } + + sb.append("]"); + } + } } /**
