Improved exchange timeout debug logging.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1116069
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1116069
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1116069

Branch: refs/heads/ignite-5398
Commit: b1116069549be224d59983b93d2ee22cab8402b8
Parents: c35dbf4
Author: sboikov <[email protected]>
Authored: Tue May 16 11:24:11 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue May 16 11:24:11 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../org/apache/ignite/internal/GridTopic.java   |   6 +-
 .../ignite/internal/IgniteDiagnosticAware.java  |  25 +
 .../internal/IgniteDiagnosticMessage.java       | 490 +++++++++++++++++++
 .../apache/ignite/internal/IgniteKernal.java    |   2 +
 .../communication/GridIoMessageFactory.java     |   5 +
 .../cache/DynamicCacheChangeBatch.java          |  14 +-
 .../cache/DynamicCacheChangeRequest.java        |  11 +-
 .../processors/cache/GridCacheIoManager.java    |  32 ++
 .../GridCachePartitionExchangeManager.java      |  36 +-
 .../processors/cache/GridCacheProcessor.java    |   2 -
 .../distributed/dht/GridDhtCacheEntry.java      |   6 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  30 +-
 .../dht/GridPartitionedSingleGetFuture.java     |  24 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  44 +-
 .../GridDhtPartitionsAbstractMessage.java       |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |  49 +-
 .../processors/cluster/ClusterProcessor.java    | 355 ++++++++++++++
 .../ignite/internal/util/nio/GridNioServer.java | 228 +++++++--
 .../communication/tcp/TcpCommunicationSpi.java  | 157 +++---
 ...agnosticMessagesMultipleConnectionsTest.java |  35 ++
 .../managers/IgniteDiagnosticMessagesTest.java  | 152 ++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +
 23 files changed, 1568 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ce2666b..1388f49 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -420,6 +420,12 @@ public final class IgniteSystemProperties {
     /** If this property is set to {@code true} then Ignite will log thread 
dump in case of partition exchange timeout. */
     public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = 
"IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT";
 
+    /** */
+    public static final String IGNITE_IO_DUMP_ON_TIMEOUT = 
"IGNITE_IO_DUMP_ON_TIMEOUT";
+
+    /** */
+    public static final String IGNITE_DIAGNOSTIC_ENABLED = 
"IGNITE_DIAGNOSTIC_ENABLED";
+
     /** Cache operations that take more time than value of this property will 
be output to log. Set to {@code 0} to disable. */
     public static final String IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT = 
"IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 86245a8..75759ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -96,13 +96,17 @@ public enum GridTopic {
     /** */
     TOPIC_TX,
 
+    /** */
     TOPIC_SNAPSHOT,
 
     /** */
     TOPIC_IO_TEST,
 
     /** */
-    TOPIC_HADOOP_MSG;
+    TOPIC_HADOOP_MSG,
+
+    /** */
+    TOPIC_INTERNAL_DIAGNOSTIC;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
new file mode 100644
index 0000000..f33f678
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ *
+ */
+public interface IgniteDiagnosticAware {
+    public void dumpDiagnosticInfo();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
new file mode 100644
index 0000000..8be571e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final ThreadLocal<DateFormat> dateFormat = new 
ThreadLocal<DateFormat>() {
+        @Override protected DateFormat initialValue() {
+            return new SimpleDateFormat("HH:mm:ss.SSS");
+        }
+    };
+
+    /** */
+    private long futId;
+
+    /** */
+    private String msg;
+
+    /** */
+    private byte[] cBytes;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public IgniteDiagnosticMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param c Closure to run.
+     * @param futId Future ID.
+     * @return Request message.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static IgniteDiagnosticMessage createRequest(GridKernalContext ctx,
+        IgniteClosure<GridKernalContext, String> c,
+        long futId)
+        throws IgniteCheckedException
+    {
+        byte[] cBytes = U.marshal(ctx.config().getMarshaller(), c);
+
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.cBytes = cBytes;
+
+        return msg;
+    }
+
+    /**
+     * @param msg0 Message.
+     * @param futId Future ID.
+     * @return Response message.
+     */
+    public static IgniteDiagnosticMessage createResponse(String msg0, long 
futId) {
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.msg = msg0;
+
+        return msg;
+    }
+
+    /**
+     * @param ctx Context.
+     * @return Unmarshalled closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgniteClosure<GridKernalContext, String> 
unmarshalClosure(GridKernalContext ctx)
+        throws IgniteCheckedException {
+        assert cBytes != null;
+
+        return U.unmarshal(ctx, cBytes, null);
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if this is request message.
+     */
+    public boolean request() {
+        return cBytes != null;
+    }
+
+    /**
+     * @return Message string.
+     */
+    public String message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("cBytes", cBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeString("msg", msg))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cBytes = reader.readByteArray("cBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                msg = reader.readString("msg");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteDiagnosticMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -46;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    public static class BaseClosure implements 
IgniteClosure<GridKernalContext, String> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        protected final UUID nodeId;
+
+        /**
+         * @param ctx Local node context.
+         */
+        public BaseClosure(GridKernalContext ctx) {
+            this.nodeId = ctx.localNodeId();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final String apply(GridKernalContext ctx) {
+            try {
+                StringBuilder sb = new StringBuilder();
+
+                IgniteInternalFuture<String> commInfo = 
dumpCommunicationInfo(ctx, nodeId);
+
+                sb.append(dumpNodeBasicInfo(ctx));
+
+                sb.append(U.nl()).append(dumpExchangeInfo(ctx));
+
+                String moreInfo = dumpInfo(ctx);
+
+                sb.append(U.nl()).append(commInfo.get());
+
+                if (moreInfo != null)
+                    sb.append(U.nl()).append(moreInfo);
+
+                return sb.toString();
+            }
+            catch (Exception e) {
+                ctx.cluster().diagnosticLog().error("Failed to execute 
diagnostic message closure: " + e, e);
+
+                return "Failed to execute diagnostic message closure: " + e;
+            }
+        }
+
+        /**
+         * @param ctx Context.
+         * @return Message.
+         */
+        protected String dumpInfo(GridKernalContext ctx) {
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TxEntriesInfoClosure extends BaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final int cacheId;
+
+        /** */
+        private final Collection<KeyCacheObject> keys;
+
+        /**
+         * @param ctx Context.
+         * @param cacheId Cache ID.
+         * @param keys Keys.
+         */
+        public TxEntriesInfoClosure(GridKernalContext ctx, int cacheId, 
Collection<KeyCacheObject> keys) {
+            super(ctx);
+
+            this.cacheId = cacheId;
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected String dumpInfo(GridKernalContext ctx) {
+            GridCacheContext cctx = 
ctx.cache().context().cacheContext(cacheId);
+
+            if (cctx == null)
+                return "Failed to find cache with id: " + cacheId;
+
+            try {
+                for (KeyCacheObject key : keys)
+                    key.finishUnmarshal(cctx.cacheObjectContext(), null);
+            }
+            catch (IgniteCheckedException e) {
+                ctx.cluster().diagnosticLog().error("Failed to unmarshal key: 
" + e, e);
+
+                return "Failed to unmarshal key: " + e;
+            }
+
+            StringBuilder sb = new StringBuilder("Cache entries [cacheId=" + 
cacheId + ", cacheName=" + cctx.name() + "]: ");
+
+            for (KeyCacheObject key : keys) {
+                sb.append(U.nl());
+
+                GridCacheMapEntry e = 
(GridCacheMapEntry)cctx.cache().peekEx(key);
+
+                sb.append("Key [key=").append(key).append(", 
entry=").append(e).append("]");
+            }
+
+            return sb.toString();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class ExchangeInfoClosure extends BaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /**
+         * @param ctx Context.
+         * @param topVer Exchange version.
+         */
+        public ExchangeInfoClosure(GridKernalContext ctx, 
AffinityTopologyVersion topVer) {
+            super(ctx);
+
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected String dumpInfo(GridKernalContext ctx) {
+            List<GridDhtPartitionsExchangeFuture> futs = 
ctx.cache().context().exchange().exchangeFutures();
+
+            for (GridDhtPartitionsExchangeFuture fut : futs) {
+                if (topVer.equals(fut.topologyVersion()))
+                    return "Exchange future: " + fut;
+            }
+
+            return "Failed to find exchange future: " + topVer;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class TxInfoClosure extends BaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final GridCacheVersion dhtVer;
+
+        /** */
+        private final GridCacheVersion nearVer;
+
+        /**
+         * @param ctx Context.
+         * @param dhtVer Tx dht version.
+         * @param nearVer Tx near version.
+         */
+        public TxInfoClosure(GridKernalContext ctx,
+            GridCacheVersion dhtVer,
+            GridCacheVersion nearVer) {
+            super(ctx);
+
+            this.dhtVer = dhtVer;
+            this.nearVer = nearVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected String dumpInfo(GridKernalContext ctx) {
+            StringBuilder b = new StringBuilder();
+
+            b.append("Related transactions [dhtVer=").append(dhtVer).
+                append(", nearVer=").append(nearVer).append("]: ");
+
+            boolean found = false;
+
+            for (IgniteInternalTx tx : 
ctx.cache().context().tm().activeTransactions()) {
+                if (dhtVer.equals(tx.xidVersion()) || 
nearVer.equals(tx.nearXidVersion())) {
+                    found = true;
+
+                    b.append(U.nl());
+                    b.append("Found related ttx 
[ver=").append(tx.xidVersion()).
+                        append(", nearVer=").append(tx.nearXidVersion()).
+                        append(", topVer=").append(tx.topologyVersion()).
+                        append(", state=").append(tx.state()).
+                        append(", fullTx=").append(tx).
+                        append("]");
+                }
+            }
+
+            if (!found) {
+                b.append(U.nl());
+                b.append("Failed to find related transactions.");
+            }
+
+            return b.toString();
+        }
+    }
+
+    /**
+     * @param ctx Context.
+     * @return Node information string.
+     */
+    static String dumpNodeBasicInfo(GridKernalContext ctx) {
+        StringBuilder sb = new StringBuilder("General node info 
[id=").append(ctx.localNodeId());
+
+        sb.append(", client=").append(ctx.clientNode());
+        sb.append(", 
discoTopVer=").append(ctx.discovery().topologyVersionEx());
+        sb.append(", time=").append(formatTime(U.currentTimeMillis()));
+
+        sb.append(']');
+
+        return sb.toString();
+    }
+
+    /**
+     * @param ctx Context.
+     * @return Exchange information string.
+     */
+    static String dumpExchangeInfo(GridKernalContext ctx) {
+        GridCachePartitionExchangeManager exchMgr = 
ctx.cache().context().exchange();
+
+        StringBuilder sb = new StringBuilder("Partitions exchange info 
[readyVer=").append(exchMgr.readyAffinityVersion());
+        sb.append("]");
+
+        GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture();
+
+        sb.append(U.nl()).append("Last initialized exchange future: 
").append(fut);
+        
+        return sb.toString();
+    }
+
+    /**
+     * @param ctx Context.
+     * @param nodeId Target node ID.
+     * @return Communication information future.
+     */
+    public static IgniteInternalFuture<String> 
dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) {
+        if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi)
+            return ((TcpCommunicationSpi) 
ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId);
+        else
+            return new GridFinishedFuture<>("Unexpected communication SPI: " + 
ctx.config().getCommunicationSpi());
+    }
+    /**
+     * @param time Time.
+     * @return Time string.
+     */
+    private static String formatTime(long time) {
+        return dateFormat.get().format(new Date(time));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDiagnosticMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1445443..a83d888 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -911,6 +911,8 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                     
provider.start(ctx.plugins().pluginContextForProvider(provider));
                 }
 
+                ctx.cluster().initListeners();
+
                 fillNodeAttributes(clusterProc.updateNotifierEnabled());
             }
             catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f73682..f443f31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
 import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
 import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
@@ -172,6 +173,10 @@ public class GridIoMessageFactory implements 
MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -46:
+                msg = new IgniteDiagnosticMessage();
+
+                break;
 
             case -45:
                 msg = new GridChangeGlobalStateMessageResponse();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 0e4373c..f423002 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -21,8 +21,11 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -38,7 +41,7 @@ public class DynamicCacheChangeBatch implements 
DiscoveryCustomMessage {
     private Collection<DynamicCacheChangeRequest> reqs;
 
     /** Client nodes map. Used in discovery data exchange. */
-    @GridToStringInclude
+    @GridToStringExclude
     private Map<String, Map<UUID, Boolean>> clientNodes;
 
     /** Custom message ID. */
@@ -132,6 +135,13 @@ public class DynamicCacheChangeBatch implements 
DiscoveryCustomMessage {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(DynamicCacheChangeBatch.class, this);
+        Object clients = F.viewReadOnly(clientNodes, new 
IgniteBiClosure<String, Map<UUID,Boolean>, Object>() {
+                @Override public Object apply(String s, Map<UUID, Boolean> 
map) {
+                    return map != null ? map.keySet() : null;
+                }
+            }
+        );
+
+        return S.toString(DynamicCacheChangeBatch.class, this, "clientNodes", 
clients);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index ad7c7a5..f0ac505 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -23,7 +23,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -45,6 +44,7 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     private String cacheName;
 
     /** Cache start configuration. */
+    @GridToStringExclude
     private CacheConfiguration startCfg;
 
     /** Cache type. */
@@ -54,6 +54,7 @@ public class DynamicCacheChangeRequest implements 
Serializable {
     private UUID initiatingNodeId;
 
     /** Near cache configuration. */
+    @GridToStringExclude
     private NearCacheConfiguration nearCacheCfg;
 
     /** Start only client cache, do not start data nodes. */
@@ -372,6 +373,12 @@ public class DynamicCacheChangeRequest implements 
Serializable {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", 
cacheName());
+        return "DynamicCacheChangeRequest [cacheName=" + cacheName() +
+            ", hasCfg=" + (startCfg != null) +
+            ", nodeId=" + initiatingNodeId +
+            ", clientStartOnly=" + clientStartOnly +
+            ", close=" + close +
+            ", stop=" + stop +
+            ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..277d176 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -125,6 +128,26 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     /** Deployment enabled. */
     private boolean depEnabled;
 
+    /** */
+    private final List<GridCacheMessage> pendingMsgs = new ArrayList<>();
+
+    /**
+     *
+     */
+    public void dumpPendingMessages() {
+        synchronized (pendingMsgs) {
+            if (pendingMsgs.isEmpty())
+                return;
+
+            log.info("Pending cache messages waiting for exchange [" +
+                "readyVer=" + cctx.exchange().readyAffinityVersion() +
+                ", discoVer=" + cctx.discovery().topologyVersion() + ']');
+
+            for (GridCacheMessage msg : pendingMsgs)
+                log.info("Message [waitVer=" + msg.topologyVersion() + ", 
msg=" + msg + ']');
+        }
+    }
+
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
         @Override public void onMessage(final UUID nodeId, final Object msg) {
@@ -211,10 +234,19 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             }
 
             if (fut != null && !fut.isDone()) {
+                synchronized (pendingMsgs) {
+                    if (pendingMsgs.size() < 100)
+                        pendingMsgs.add(cacheMsg);
+                }
+
                 fut.listen(new CI1<IgniteInternalFuture<?>>() {
                     @Override public void apply(IgniteInternalFuture<?> t) {
                         cctx.kernalContext().closure().runLocalSafe(new 
Runnable() {
                             @Override public void run() {
+                                synchronized (pendingMsgs) {
+                                    pendingMsgs.remove(cacheMsg);
+                                }
+
                                 IgniteLogger log = 
cacheMsg.messageLogger(cctx);
 
                                 if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 95cb452..02da4fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -49,13 +49,13 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import 
org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -1383,7 +1383,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         U.warn(log, "Pending exchange futures:");
 
         for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
-            U.warn(log, ">>> " + fut);
+            U.warn(log, ">>> " + fut.shortInfo());
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1400,7 +1400,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             int cnt = 0;
 
             for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
-                U.warn(log, ">>> " + fut);
+                U.warn(log, ">>> " + fut.shortInfo());
 
                 if (++cnt == 10)
                     break;
@@ -1414,8 +1414,11 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         cctx.affinity().dumpDebugInfo();
 
+        cctx.io().dumpPendingMessages();
+
         // Dump IO manager statistics.
-        cctx.gridIO().dumpStats();
+        if 
(IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT,
 false))
+            cctx.gridIO().dumpStats();
     }
 
     /**
@@ -1460,6 +1463,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         if (longRunningOpsDumpCnt < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                             U.warn(log, "Found long running cache future 
[startTime=" + formatTime(fut.startTime()) +
                                 ", curTime=" + formatTime(curTime) + ", fut=" 
+ fut + ']');
+
+                            if (fut instanceof IgniteDiagnosticAware)
+                                
((IgniteDiagnosticAware)fut).dumpDiagnosticInfo();
                         }
                         else
                             break;
@@ -1473,6 +1479,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         if (longRunningOpsDumpCnt < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                             U.warn(log, "Found long running cache future 
[startTime=" + formatTime(fut.startTime()) +
                                 ", curTime=" + formatTime(curTime) + ", fut=" 
+ fut + ']');
+
+                            if (fut instanceof IgniteDiagnosticAware)
+                                
((IgniteDiagnosticAware)fut).dumpDiagnosticInfo();
                         }
                         else
                             break;
@@ -1480,6 +1489,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 }
             }
 
+            cctx.io().dumpPendingMessages();
+
             if (found) {
                 if (longRunningOpsDumpCnt < 
GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
                     longRunningOpsDumpCnt++;
@@ -1493,7 +1504,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     U.warn(log, "Found long running cache operations, dump IO 
statistics.");
 
                     // Dump IO manager statistics.
-                    cctx.gridIO().dumpStats();
+                    if 
(IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT,
 false))
+                        cctx.gridIO().dumpStats();
                 }
             }
             else
@@ -1542,14 +1554,22 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
             U.warn(log, "Pending cache futures:");
 
-            for (GridCacheFuture<?> fut : mvcc.activeFutures())
+            for (GridCacheFuture<?> fut : mvcc.activeFutures()) {
                 U.warn(log, ">>> " + fut);
 
+                if (fut instanceof IgniteDiagnosticAware)
+                    ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo();
+            }
+
             U.warn(log, "Pending atomic cache futures:");
 
-            for (GridCacheFuture<?> fut : mvcc.atomicFutures())
+            for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
                 U.warn(log, ">>> " + fut);
 
+                if (fut instanceof IgniteDiagnosticAware)
+                    ((IgniteDiagnosticAware)fut).dumpDiagnosticInfo();
+            }
+
             U.warn(log, "Pending data streamer futures:");
 
             for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
@@ -1727,6 +1747,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                             U.dumpThreads(log);
 
                                         dumpedObjects++;
+
+                                        exchFut.dumpDiagnosticInfo();
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 8ad3c8d..bf7a4fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -148,8 +148,6 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.affinityNode;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.clientNode;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 
 /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index f1f4376..56ad1e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntry
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -70,6 +71,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     private volatile ReaderId[] rdrs = ReaderId.EMPTY_ARRAY;
 
     /** Local partition. */
+    @GridToStringExclude
     private final GridDhtLocalPartition locPart;
 
     /**
@@ -720,7 +722,9 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
 
     /** {@inheritDoc} */
     @Override public synchronized String toString() {
-        return S.toString(GridDhtCacheEntry.class, this, "super", 
super.toString());
+        return S.toString(GridDhtCacheEntry.class, this,
+            "part", locPart.id(),
+            "super", super.toString());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a17b782..f889dc8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -100,7 +101,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARED;
  */
 @SuppressWarnings("unchecked")
 public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
-    implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
+    implements GridCacheMvccFuture<GridNearTxPrepareResponse>, 
IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -1579,6 +1580,33 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
     }
 
     /** {@inheritDoc} */
+    @Override public void dumpDiagnosticInfo() {
+        if (!isDone()) {
+            for (IgniteInternalFuture fut : futures()) {
+                if (!fut.isDone() && fut instanceof MiniFuture) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (!f.node().isLocal()) {
+                        GridCacheVersion dhtVer = tx.xidVersion();
+                        GridCacheVersion nearVer = tx.nearXidVersion();
+
+                        
cctx.kernalContext().cluster().dumpRemoteTxInfo(f.nodeId, dhtVer, nearVer, 
"GridDhtTxPrepareFuture " +
+                            "waiting for response [node=" + f.nodeId +
+                            ", topVer=" + tx.topologyVersion() +
+                            ", dhtVer=" + dhtVer +
+                            ", nearVer=" + nearVer +
+                            ", futId=" + futId +
+                            ", miniId=" + f.futId +
+                            ", tx=" + tx + ']');
+
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 6e438ed..08d2084 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -64,7 +65,7 @@ import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  *
  */
 public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> 
implements GridCacheFuture<Object>,
-    CacheGetFuture {
+    CacheGetFuture, IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -760,6 +761,27 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
     }
 
     /** {@inheritDoc} */
+    @Override public void dumpDiagnosticInfo() {
+        if (!isDone()) {
+            UUID nodeId;
+            AffinityTopologyVersion topVer;
+
+            synchronized (this) {
+                nodeId = node != null ? node.id() : null;
+                topVer = this.topVer;
+            }
+
+            if (nodeId != null)
+                cctx.kernalContext().cluster().dumpBasicInfo(nodeId, 
"GridPartitionedSingleGetFuture waiting for " +
+                    "response [node=" + nodeId +
+                    ", key=" + key +
+                    ", futId=" + futId +
+                    ", topVer=" + topVer + ']',
+                    null);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridPartitionedSingleGetFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index a1847d2..00bcd10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -84,7 +85,7 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
  * Colocated cache lock future.
  */
 public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture<Boolean>
-    implements GridCacheMvccFuture<Boolean> {
+    implements GridCacheMvccFuture<Boolean>, IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -589,13 +590,51 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
     }
 
     /** {@inheritDoc} */
+    @Override public void dumpDiagnosticInfo() {
+        if (!isDone()) {
+            for (IgniteInternalFuture fut : futures()) {
+                if (!fut.isDone() && isMini(fut)) {
+                    MiniFuture m = (MiniFuture)fut;
+
+                    AffinityTopologyVersion topVer = null;
+                    UUID rmtNodeId = null;
+
+                    synchronized (m) {
+                        if (!m.rcvRes && !m.node.isLocal()) {
+                            rmtNodeId = m.node.id();
+
+                            topVer = this.topVer;
+                        }
+                    }
+
+                    if (rmtNodeId != null) {
+                        
cctx.kernalContext().cluster().dumpTxKeyInfo(rmtNodeId, cctx.cacheId(), m.keys,
+                            "GridDhtColocatedLockFuture waiting for response 
[node=" + rmtNodeId +
+                            ", cache=" + cctx.name() +
+                            ", miniId=" + m.futId +
+                            ", topVer=" + topVer +
+                            ", keys=" + m.keys + ']');
+
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new 
C1<IgniteInternalFuture<?>, String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {
                 if (isMini(f)) {
                     MiniFuture m = (MiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + 
m.node().isLocal() + ", done=" + f.isDone() + "]";
+                    synchronized (m) {
+                        return "[node=" + m.node().id() +
+                            ", rcvRes=" + m.rcvRes +
+                            ", loc=" + m.node().isLocal() +
+                            ", done=" + f.isDone() + "]";
+                    }
                 }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
@@ -603,6 +642,7 @@ public final class GridDhtColocatedLockFuture extends 
GridCompoundIdentityFuture
         });
 
         return S.toString(GridDhtColocatedLockFuture.class, this,
+            "topVer", topVer,
             "innerFuts", futs,
             "inTx", inTx(),
             "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index f1e2c01..70784a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -68,6 +68,11 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public int partition() {
+        return Integer.MIN_VALUE;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean addDeploymentInfo() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 28c3956..4e04156 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -107,7 +108,7 @@ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
 public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, 
GridDhtTopologyFuture, IgniteDiagnosticAware {
     /** */
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD,
 10);
@@ -230,6 +231,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new 
ConcurrentHashMap8<>();
 
     /** */
+    @GridToStringExclude
     private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = 
new IgniteDhtPartitionHistorySuppliersMap();
 
     /** Forced Rebalance future. */
@@ -239,6 +241,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     private volatile Map<Integer, Map<Integer, Long>> partHistReserved;
 
     /** */
+    @GridToStringExclude
     private volatile IgniteDhtPartitionsToReloadMap partsToReload = new 
IgniteDhtPartitionsToReloadMap();
 
     /**
@@ -1352,10 +1355,6 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         }
 
         if (super.onDone(res, err) && realExchange) {
-            exchLog.info("exchange finished [topVer=" + topologyVersion() +
-                ", time1=" + duration() +
-                ", time2=" + (U.currentTimeMillis() - initTs) + ']');
-
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + 
cctx.localNodeId() + ", exchange= " + this +
                     "duration=" + duration() + ", durationFromInit=" + 
(U.currentTimeMillis() - initTs) + ']');
@@ -2324,13 +2323,49 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public void dumpDiagnosticInfo() {
+        if (!isDone()) {
+            ClusterNode crd;
+            Set<UUID> remaining;
+
+            synchronized (mux) {
+                crd = this.crd;
+                remaining = new HashSet<>(this.remaining);
+            }
+
+            if (crd != null) {
+                if (!crd.isLocal()) {
+                    cctx.kernalContext().cluster().dumpExchangeInfo(crd.id(), 
topologyVersion(), "Exchange future waiting for coordinator " +
+                        "response [crd=" + crd.id() + ", topVer=" + 
topologyVersion() + ']');
+                }
+                else if (!remaining.isEmpty()){
+                    UUID nodeId = remaining.iterator().next();
+
+                    cctx.kernalContext().cluster().dumpExchangeInfo(crd.id(), 
topologyVersion(), "Exchange future waiting for server " +
+                        "response [node=" + nodeId + ", topVer=" + 
topologyVersion() + ']');
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Short information string.
+     */
+    public String shortInfo() {
+        return "GridDhtPartitionsExchangeFuture [topVer=" + topologyVersion() +
+            ", evt=" + (discoEvt != null ? discoEvt.type() : -1) +
+            ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) +
+            ", done=" + isDone() + ']';
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Set<UUID> remaining;
-        List<ClusterNode> srvNodes;
+        int srvNodes;
 
         synchronized (mux) {
             remaining = new HashSet<>(this.remaining);
-            srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) 
: null;
+            srvNodes = this.srvNodes != null ? this.srvNodes.size() : 0;
         }
 
         return S.toString(GridDhtPartitionsExchangeFuture.class, this,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 124cb4b..317b274 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -19,26 +19,52 @@ package org.apache.ignite.internal.processors.cluster;
 
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteProperties;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.IgniteClusterImpl;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -46,6 +72,13 @@ import static 
org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
  */
 public class ClusterProcessor extends GridProcessorAdapter {
     /** */
+    private final boolean DIAGNOSTIC_ENABLED =
+        
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED,
 false);
+
+    /** */
+    private static final String DIAGNOSTIC_LOG_CATEGORY = 
"org.apache.ignite.internal.diagnostic";
+
+    /** */
     private static final String ATTR_UPDATE_NOTIFIER_STATUS = 
"UPDATE_NOTIFIER_STATUS";
 
     /** Periodic version check delay. */
@@ -68,6 +101,16 @@ public class ClusterProcessor extends GridProcessorAdapter {
     @GridToStringExclude
     private GridUpdateNotifier verChecker;
 
+    /** */
+    private final IgniteLogger diagnosticLog;
+
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, 
InternalDiagnosticFuture>> diagnosticFutMap =
+        new AtomicReference<>();
+
+    /** */
+    private final AtomicLong diagFutId = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -78,6 +121,102 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
             
Boolean.parseBoolean(IgniteProperties.get("ignite.update.notifier.enabled.by.default"))));
 
         cluster = new IgniteClusterImpl(ctx);
+
+        diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void initListeners() throws IgniteCheckedException {
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    assert evt instanceof DiscoveryEvent;
+                    assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
+
+                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                    UUID nodeId = discoEvt.eventNode().id();
+
+                    ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = 
diagnosticFutMap.get();
+
+                    if (futs != null) {
+                        for (InternalDiagnosticFuture fut : futs.values()) {
+                            if (fut.nodeId.equals(nodeId))
+                                fut.onDone("Target node failed: " + nodeId);
+                        }
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, new 
GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (msg instanceof IgniteDiagnosticMessage) {
+                    IgniteDiagnosticMessage msg0 = 
(IgniteDiagnosticMessage)msg;
+
+                    if (msg0.request()) {
+                        ClusterNode node = ctx.discovery().node(nodeId);
+
+                        if (node == null) {
+                            if (diagnosticLog.isDebugEnabled()) {
+                                diagnosticLog.debug("Skip diagnostic request, 
sender node left " +
+                                    "[node=" + nodeId + ", msg=" + msg + ']');
+                            }
+
+                            return;
+                        }
+
+                        String resMsg;
+
+                        IgniteClosure<GridKernalContext, String> c;
+
+                        try {
+                            c = msg0.unmarshalClosure(ctx);
+
+                            resMsg = c.apply(ctx);
+                        }
+                        catch (Exception e) {
+                            U.error(diagnosticLog, "Failed to run diagnostic 
closure: " + e, e);
+
+                            resMsg = "Failed to run diagnostic closure: " + e;
+                        }
+
+                        IgniteDiagnosticMessage res = 
IgniteDiagnosticMessage.createResponse(resMsg, msg0.futureId());
+
+                        try {
+                            ctx.io().send(node, 
GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
+                        }
+                        catch (ClusterTopologyCheckedException e) {
+                            if (diagnosticLog.isDebugEnabled()) {
+                                diagnosticLog.debug("Failed to send diagnostic 
response, node left " +
+                                    "[node=" + nodeId + ", msg=" + msg + ']');
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(diagnosticLog, "Failed to send diagnostic 
response [msg=" + msg0 + "]", e);
+                        }
+                    }
+                    else {
+                        InternalDiagnosticFuture fut = 
diagnosticFuturesMap().get(msg0.futureId());
+
+                        if (fut != null)
+                            fut.onResponse(msg0);
+                        else
+                            U.warn(diagnosticLog, "Failed to find diagnostic 
message future [msg=" + msg0 + ']');
+                    }
+                }
+                else
+                    U.warn(diagnosticLog, "Received unexpected message: " + 
msg);
+            }
+        });
+    }
+
+    /**
+     * @return Logger for diagnostic category.
+     */
+    public IgniteLogger diagnosticLog() {
+        return diagnosticLog;
     }
 
     /**
@@ -178,6 +317,180 @@ public class ClusterProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Target node ID.
+     * @param dhtVer Tx dht version.
+     * @param nearVer Tx near version.
+     * @param msg Local message to log.
+     */
+    public void dumpRemoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, 
GridCacheVersion nearVer, final String msg) {
+        if (!DIAGNOSTIC_ENABLED)
+            return;
+
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId,
+            new IgniteDiagnosticMessage.TxInfoClosure(ctx, dhtVer, nearVer),
+            msg);
+
+        listenAndLog(fut);
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param cacheId Cache ID.
+     * @param keys Keys.
+     * @param msg Local message to log.
+     */
+    public void dumpTxKeyInfo(UUID nodeId, int cacheId, 
Collection<KeyCacheObject> keys, final String msg) {
+        if (!DIAGNOSTIC_ENABLED)
+            return;
+
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new 
IgniteDiagnosticMessage.TxEntriesInfoClosure(ctx, cacheId, keys), msg);
+
+        listenAndLog(fut);
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param msg Local message to log.
+     */
+    public void dumpBasicInfo(final UUID nodeId, final String msg,
+        @Nullable IgniteInClosure<IgniteInternalFuture<String>> lsnr) {
+        if (!DIAGNOSTIC_ENABLED)
+            return;
+
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new 
IgniteDiagnosticMessage.BaseClosure(ctx), msg);
+
+        if (lsnr != null)
+            fut.listen(lsnr);
+
+        listenAndLog(fut);
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param topVer Exchange topology version.
+     * @param msg Local message to log.
+     */
+    public void dumpExchangeInfo(final UUID nodeId, AffinityTopologyVersion 
topVer, final String msg) {
+        if (!DIAGNOSTIC_ENABLED)
+            return;
+
+        IgniteInternalFuture<String> fut = diagnosticInfo(nodeId, new 
IgniteDiagnosticMessage.ExchangeInfoClosure(ctx, topVer), msg);
+
+        listenAndLog(fut);
+    }
+
+    /**
+     * @param fut Future.
+     */
+    private void listenAndLog(IgniteInternalFuture<String> fut) {
+        fut.listen(new CI1<IgniteInternalFuture<String>>() {
+            @Override public void apply(IgniteInternalFuture<String> msgFut) {
+                try {
+                    String msg = msgFut.get();
+
+                    diagnosticLog.info(msg);
+                }
+                catch (Exception e) {
+                    U.error(diagnosticLog, "Failed to dump diagnostic info: " 
+ e, e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param c Closure.
+     * @param baseMsg Local message to log.
+     * @return Message future.
+     */
+    private IgniteInternalFuture<String> diagnosticInfo(final UUID nodeId,
+        IgniteClosure<GridKernalContext, String> c,
+        final String baseMsg) {
+        final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>();
+
+        final IgniteInternalFuture<String> rmtFut = 
sendDiagnosticMessage(nodeId, c);
+
+        rmtFut.listen(new CI1<IgniteInternalFuture<String>>() {
+            @Override public void apply(IgniteInternalFuture<String> fut) {
+                String rmtMsg;
+
+                try {
+                    rmtMsg = fut.get();
+                }
+                catch (Exception e) {
+                    rmtMsg = "Diagnostic processing error: " + e;
+                }
+
+                final String rmtMsg0 = rmtMsg;
+
+                IgniteInternalFuture<String> locFut = 
IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId);
+
+                locFut.listen(new CI1<IgniteInternalFuture<String>>() {
+                    @Override public void apply(IgniteInternalFuture<String> 
locFut) {
+                        String locMsg;
+
+                        try {
+                            locMsg = locFut.get();
+                        }
+                        catch (Exception e) {
+                            locMsg = "Failed to get info for local node: " + e;
+                        }
+
+                        String sb = baseMsg + U.nl() +
+                            "Remote node information:" + U.nl() + rmtMsg0 +
+                            U.nl() + "Local communication statistics:" + 
U.nl() +
+                            locMsg;
+
+                        infoFut.onDone(sb);
+                    }
+                });
+            }
+        });
+
+        return infoFut;
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param c Message closure.
+     * @return Message future.
+     */
+    private IgniteInternalFuture<String> sendDiagnosticMessage(UUID nodeId, 
IgniteClosure<GridKernalContext, String> c) {
+        try {
+            IgniteDiagnosticMessage msg = 
IgniteDiagnosticMessage.createRequest(ctx,
+                c,
+                diagFutId.getAndIncrement());
+
+            InternalDiagnosticFuture fut = new 
InternalDiagnosticFuture(nodeId, msg.futureId());
+
+            diagnosticFuturesMap().put(msg.futureId(), fut);
+
+            ctx.io().send(nodeId, GridTopic.TOPIC_INTERNAL_DIAGNOSTIC, msg, 
GridIoPolicy.SYSTEM_POOL);
+
+            return fut;
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to send diagnostic message: " + e);
+
+            return new GridFinishedFuture<>("Failed to send diagnostic 
message: " + e);
+        }
+    }
+
+    /**
+     * @return Diagnostic messages futures map.
+     */
+    private ConcurrentHashMap<Long, InternalDiagnosticFuture> 
diagnosticFuturesMap() {
+        ConcurrentHashMap<Long, InternalDiagnosticFuture> map = 
diagnosticFutMap.get();
+
+        if (map == null) {
+            if (!diagnosticFutMap.compareAndSet(null, map = new 
ConcurrentHashMap<>()))
+                map = diagnosticFutMap.get();
+        }
+
+        return map;
+    }
+
+    /**
      * Update notifier timer task.
      */
     private static class UpdateNotifierTimerTask extends GridTimerTask {
@@ -245,4 +558,46 @@ public class ClusterProcessor extends GridProcessorAdapter 
{
             }
         }
     }
+
+    /**
+     *
+     */
+    class InternalDiagnosticFuture extends GridFutureAdapter<String> {
+        /** */
+        private final long id;
+
+        /** */
+        private final UUID nodeId;
+
+        /**
+         * @param id Future ID.
+         */
+        InternalDiagnosticFuture(UUID nodeId, long id) {
+            this.nodeId = nodeId;
+            this.id = id;
+        }
+
+        /**
+         * @param msg Response message.
+         */
+        public void onResponse(IgniteDiagnosticMessage msg) {
+            onDone(msg.message());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable String res, @Nullable 
Throwable err) {
+            if (super.onDone(res, err)) {
+                diagnosticFuturesMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(InternalDiagnosticFuture.class, this);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1116069/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a59adba..cbba5da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -54,7 +54,9 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -701,6 +704,7 @@ public class GridNioServer<T> {
     /**
      *
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dumpStats() {
         U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + 
readerMoveCnt.get() +
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -710,6 +714,51 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param msg Message to add.
+     * @param p Session predicate.
+     * @return Future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    public IgniteInternalFuture<String> dumpNodeStats(final String msg, 
IgnitePredicate<GridNioSession> p) {
+        GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new 
IgniteReducer<String, String>() {
+            private final StringBuilder sb = new StringBuilder(msg);
+
+            @Override public boolean collect(@Nullable String msg) {
+                if (!F.isEmpty(msg)) {
+                    synchronized (sb) {
+                        if (sb.length() > 0)
+                            sb.append(U.nl());
+
+                        sb.append(msg);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public String reduce() {
+                synchronized (sb) {
+                    return sb.toString();
+                }
+            }
+        });
+
+        for (int i = 0; i < clientWorkers.size(); i++) {
+            NioOperationFuture<String> opFut = new NioOperationFuture<>(null, 
NioOperation.DUMP_STATS);
+
+            opFut.msg = p;
+
+            clientWorkers.get(i).offer(opFut);
+
+            fut.add(opFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
      * Establishes a session.
      *
      * @param ch Channel to register within the server and create session for.
@@ -1509,12 +1558,15 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker 
implements GridNioWorker {
         /** Queue of change requests on this selector. */
+        @GridToStringExclude
         private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = 
new ConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
+        @GridToStringExclude
         private Selector selector;
 
         /** Selected keys. */
+        @GridToStringExclude
         private SelectedSelectionKeySet selectedKeys;
 
         /** Worker index. */
@@ -1533,6 +1585,7 @@ public class GridNioServer<T> {
         private volatile long bytesSent0;
 
         /** Sessions assigned to this worker. */
+        @GridToStringExclude
         private final GridConcurrentHashSet<GridSelectorNioSessionImpl> 
workerSessions =
             new GridConcurrentHashSet<>();
 
@@ -1807,12 +1860,28 @@ public class GridNioServer<T> {
                             case DUMP_STATS: {
                                 NioOperationFuture req = 
(NioOperationFuture)req0;
 
-                                try {
-                                    dumpStats();
+                                if (req.msg instanceof IgnitePredicate) {
+                                    StringBuilder sb = new StringBuilder();
+
+                                    try {
+                                        dumpStats(sb, 
(IgnitePredicate<GridNioSession>)req.msg, true);
+                                    }
+                                    finally {
+                                        req.onDone(sb.toString());
+                                    }
                                 }
-                                finally {
-                                    // Complete the request just in case (none 
should wait on this future).
-                                    req.onDone(true);
+                                else {
+                                    try {
+                                        StringBuilder sb = new StringBuilder();
+
+                                        dumpStats(sb, null, false);
+
+                                        U.warn(log, sb.toString());
+                                    }
+                                    finally {
+                                        // Complete the request just in case 
(none should wait on this future).
+                                        req.onDone(true);
+                                    }
                                 }
                             }
                         }
@@ -1920,80 +1989,131 @@ public class GridNioServer<T> {
         }
 
         /**
-         *
+         * @param sb Message builder.
+         * @param keys Keys.
          */
-        private void dumpStats() {
-            StringBuilder sb = new StringBuilder();
-
-            Set<SelectionKey> keys = selector.keys();
-
-            sb.append(U.nl())
-                .append(">> Selector info [idx=").append(idx)
+        private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> 
keys) {
+            sb.append(">> Selector info [idx=").append(idx)
                 .append(", keysCnt=").append(keys.size())
                 .append(", bytesRcvd=").append(bytesRcvd)
                 .append(", bytesRcvd0=").append(bytesRcvd0)
                 .append(", bytesSent=").append(bytesSent)
                 .append(", bytesSent0=").append(bytesSent0)
                 .append("]").append(U.nl());
+        }
+
+        /**
+         * @param sb Message builder.
+         * @param p Optional session predicate.
+         * @param shortInfo Short info flag.
+         */
+        private void dumpStats(StringBuilder sb,
+            @Nullable IgnitePredicate<GridNioSession> p,
+            boolean shortInfo) {
+            Set<SelectionKey> keys = selector.keys();
+
+            boolean selInfo = p == null;
+
+            if (selInfo)
+                dumpSelectorInfo(sb, keys);
 
             for (SelectionKey key : keys) {
                 GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
 
-                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                MessageReader reader = 
ses.meta(GridDirectParser.READER_META_KEY);
+                boolean sesInfo = p == null || p.apply(ses);
 
-                sb.append("    Connection info [")
-                    .append("in=").append(ses.accepted())
-                    .append(", rmtAddr=").append(ses.remoteAddress())
-                    .append(", locAddr=").append(ses.localAddress());
+                if (sesInfo) {
+                    if (!selInfo) {
+                        dumpSelectorInfo(sb, keys);
 
-                GridNioRecoveryDescriptor outDesc = 
ses.outRecoveryDescriptor();
+                        selInfo = true;
+                    }
 
-                if (outDesc != null) {
-                    sb.append(", msgsSent=").append(outDesc.sent())
-                        .append(", msgsAckedByRmt=").append(outDesc.acked())
-                        .append(", 
descIdHash=").append(System.identityHashCode(outDesc));
-                }
-                else
-                    sb.append(", outRecoveryDesc=null");
+                    sb.append("    Connection info [")
+                        .append("in=").append(ses.accepted())
+                        .append(", rmtAddr=").append(ses.remoteAddress())
+                        .append(", locAddr=").append(ses.localAddress());
 
-                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+                    GridNioRecoveryDescriptor outDesc = 
ses.outRecoveryDescriptor();
 
-                if (inDesc != null) {
-                    sb.append(", msgsRcvd=").append(inDesc.received())
-                        .append(", 
lastAcked=").append(inDesc.lastAcknowledged())
-                        .append(", 
descIdHash=").append(System.identityHashCode(inDesc));
-                }
-                else
-                    sb.append(", inRecoveryDesc=null");
+                    if (outDesc != null) {
+                        sb.append(", msgsSent=").append(outDesc.sent())
+                            .append(", 
msgsAckedByRmt=").append(outDesc.acked())
+                            .append(", 
descIdHash=").append(System.identityHashCode(outDesc));
+
+                        if (!outDesc.messagesRequests().isEmpty()) {
+                            int cnt = 0;
+
+                            sb.append(", unackedMsgs=[");
+
+                            for (SessionWriteRequest req : 
outDesc.messagesRequests()) {
+                                if (cnt != 0)
+                                    sb.append(", ");
+
+                                Object msg = req.message();
 
-                sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                    .append(", bytesRcvd0=").append(ses.bytesReceived0())
-                    .append(", bytesSent=").append(ses.bytesSent())
-                    .append(", bytesSent0=").append(ses.bytesSent0())
-                    .append(", opQueueSize=").append(ses.writeQueueSize())
-                    .append(", msgWriter=").append(writer != null ? 
writer.toString() : "null")
-                    .append(", msgReader=").append(reader != null ? 
reader.toString() : "null");
+                                if (shortInfo && msg instanceof GridIoMessage)
+                                    msg = 
((GridIoMessage)msg).message().getClass().getSimpleName();
 
-                int cnt = 0;
+                                sb.append(msg);
 
-                for (SessionWriteRequest req : ses.writeQueue()) {
-                    if (cnt == 0)
-                        sb.append(",\n opQueue=[").append(req);
+                                if (++cnt == 5)
+                                    break;
+                            }
+
+                            sb.append(']');
+                        }
+                    }
                     else
-                        sb.append(',').append(req);
+                        sb.append(", outRecoveryDesc=null");
 
-                    if (++cnt == 5) {
-                        sb.append(']');
+                    GridNioRecoveryDescriptor inDesc = 
ses.inRecoveryDescriptor();
 
-                        break;
+                    if (inDesc != null) {
+                        sb.append(", msgsRcvd=").append(inDesc.received())
+                            .append(", 
lastAcked=").append(inDesc.lastAcknowledged())
+                            .append(", 
descIdHash=").append(System.identityHashCode(inDesc));
                     }
-                }
+                    else
+                        sb.append(", inRecoveryDesc=null");
 
-                sb.append("]").append(U.nl());
-            }
+                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
+                        .append(", bytesSent=").append(ses.bytesSent())
+                        .append(", bytesSent0=").append(ses.bytesSent0())
+                        .append(", opQueueSize=").append(ses.writeQueueSize());
+
+                    if (!shortInfo) {
+                        MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                        MessageReader reader = 
ses.meta(GridDirectParser.READER_META_KEY);
+
+                        sb.append(", msgWriter=").append(writer != null ? 
writer.toString() : "null")
+                            .append(", msgReader=").append(reader != null ? 
reader.toString() : "null");
+                    }
+
+                    int cnt = 0;
+
+                    for (SessionWriteRequest req : ses.writeQueue()) {
+                        Object msg = req.message();
 
-            U.warn(log, sb.toString());
+                        if (shortInfo && msg instanceof GridIoMessage)
+                            msg = 
((GridIoMessage)msg).message().getClass().getSimpleName();
+
+                        if (cnt == 0)
+                            sb.append(",\n opQueue=[").append(msg);
+                        else
+                            sb.append(',').append(msg);
+
+                        if (++cnt == 5) {
+                            sb.append(']');
+
+                            break;
+                        }
+                    }
+
+                    sb.append("]");
+                }
+            }
         }
 
         /**

Reply via email to