ignite-5155 Added possibility to receive diagnostic information from remote nodes for hanging cache futures
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d98ccec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d98ccec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d98ccec Branch: refs/heads/ignite-5272 Commit: 5d98ccecbd43ad155f1394356c5569604556f158 Parents: 3c5ffd1 Author: sboikov <[email protected]> Authored: Sat Jun 10 11:51:10 2017 +0300 Committer: sboikov <[email protected]> Committed: Sat Jun 10 11:51:10 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 6 + .../org/apache/ignite/internal/GridTopic.java | 6 +- .../ignite/internal/IgniteDiagnosticAware.java | 28 ++ .../ignite/internal/IgniteDiagnosticInfo.java | 45 ++ .../internal/IgniteDiagnosticMessage.java | 467 +++++++++++++++++++ .../IgniteDiagnosticPrepareContext.java | 279 +++++++++++ .../apache/ignite/internal/IgniteKernal.java | 7 +- .../managers/communication/GridIoManager.java | 2 +- .../communication/GridIoMessageFactory.java | 8 +- .../processors/GridProcessorAdapter.java | 10 + .../processors/cache/GridCacheIoManager.java | 34 ++ .../GridCachePartitionExchangeManager.java | 136 ++++-- .../cache/GridCacheSharedManagerAdapter.java | 9 + .../distributed/dht/GridDhtTxPrepareFuture.java | 31 +- .../dht/GridPartitionedSingleGetFuture.java | 25 +- .../colocated/GridDhtColocatedLockFuture.java | 45 +- .../GridDhtPartitionsExchangeFuture.java | 48 +- .../processors/cluster/ClusterProcessor.java | 305 +++++++++++- .../ignite/internal/util/nio/GridNioServer.java | 212 ++++++--- .../communication/tcp/TcpCommunicationSpi.java | 185 +++++--- .../managers/IgniteDiagnosticMessagesTest.java | 255 ++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 6 +- 22 files changed, 1970 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 84f3732..539f288 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -448,6 +448,12 @@ public final class IgniteSystemProperties { /** If this property is set to {@code true} then Ignite will log thread dump in case of partition exchange timeout. */ public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = "IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT"; + /** */ + public static final String IGNITE_IO_DUMP_ON_TIMEOUT = "IGNITE_IO_DUMP_ON_TIMEOUT"; + + /** */ + public static final String IGNITE_DIAGNOSTIC_ENABLED = "IGNITE_DIAGNOSTIC_ENABLED"; + /** Cache operations that take more time than value of this property will be output to log. Set to {@code 0} to disable. */ public static final String IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT = "IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT"; http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index c382999..abdbf95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -96,6 +96,7 @@ public enum GridTopic { /** */ TOPIC_TX, + /** */ TOPIC_SNAPSHOT, /** */ @@ -111,7 +112,10 @@ public enum GridTopic { TOPIC_METADATA_REQ, /** */ - TOPIC_SCHEMA; + TOPIC_SCHEMA, + + /** */ + TOPIC_INTERNAL_DIAGNOSTIC; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java new file mode 100644 index 0000000..45a5f3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +/** + * + */ +public interface IgniteDiagnosticAware { + /** + * @param ctx Context. + */ + public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java new file mode 100644 index 0000000..f82f600 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.io.Serializable; + +/** + * + */ +public class IgniteDiagnosticInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private String msg; + + /** + * @param msg Message. + */ + public IgniteDiagnosticInfo(String msg) { + this.msg = msg; + } + + /** + * @return Message. + */ + public String message() { + return msg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java new file mode 100644 index 0000000..4f37f53 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteDiagnosticMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int REQUEST_FLAG_MASK = 0x01; + + /** */ + private static final ThreadLocal<DateFormat> dateFormat = new ThreadLocal<DateFormat>() { + @Override protected DateFormat initialValue() { + return new SimpleDateFormat("HH:mm:ss.SSS"); + } + }; + + /** */ + private byte flags; + + /** */ + private long futId; + + /** */ + private byte[] bytes; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public IgniteDiagnosticMessage() { + // No-op. + } + + /** + * @param marsh Marshaller. + * @param c Closure to run. + * @param futId Future ID. + * @return Request message. + * @throws IgniteCheckedException If failed. + */ + public static IgniteDiagnosticMessage createRequest(Marshaller marsh, + IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c, + long futId) + throws IgniteCheckedException + { + byte[] cBytes = U.marshal(marsh, c); + + IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage(); + + msg.futId = futId; + msg.bytes = cBytes; + msg.flags |= REQUEST_FLAG_MASK; + + return msg; + } + + /** + * @param resBytes Marshalled result. + * @param futId Future ID. + * @return Response message. + */ + public static IgniteDiagnosticMessage createResponse(byte[] resBytes, long futId) { + IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage(); + + msg.futId = futId; + msg.bytes = resBytes; + + return msg; + } + + /** + * @param marsh Marshaller. + * @return Unmarshalled payload. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <T> T unmarshal(Marshaller marsh) + throws IgniteCheckedException { + if (bytes == null) + return null; + + return U.unmarshal(marsh, bytes, null); + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return {@code True} if this is request message. + */ + public boolean request() { + return (flags & REQUEST_FLAG_MASK) != 0; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByteArray("bytes", bytes)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + bytes = reader.readByteArray("bytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(IgniteDiagnosticMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -55; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** + * + */ + public static abstract class DiagnosticBaseClosure implements IgniteBiInClosure<StringBuilder, GridKernalContext> { + /** + * @return Key to group similar messages. + */ + public Object mergeKey() { + return getClass(); + } + + /** + * @param other Another closure of the same type. + */ + public void merge(DiagnosticBaseClosure other) { + // No-op. + } + } + + /** + * + */ + public final static class TxEntriesInfoClosure extends DiagnosticBaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final int cacheId; + + /** */ + private final Set<KeyCacheObject> keys; + + /** + * @param cacheId Cache ID. + * @param keys Keys. + */ + TxEntriesInfoClosure(int cacheId, Collection<KeyCacheObject> keys) { + this.cacheId = cacheId; + this.keys = new HashSet<>(keys); + } + + /** {@inheritDoc} */ + @Override public void apply(StringBuilder sb, GridKernalContext ctx) { + sb.append(U.nl()); + + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); + + if (cctx == null) { + sb.append("Failed to find cache with id: ").append(cacheId); + + return; + } + + try { + for (KeyCacheObject key : keys) + key.finishUnmarshal(cctx.cacheObjectContext(), null); + } + catch (IgniteCheckedException e) { + ctx.cluster().diagnosticLog().error("Failed to unmarshal key: " + e, e); + + sb.append("Failed to unmarshal key: ").append(e).append(U.nl()); + } + + sb.append("Cache entries [cacheId=").append(cacheId) + .append(", cacheName=").append(cctx.name()).append("]: "); + + for (KeyCacheObject key : keys) { + GridCacheMapEntry e = (GridCacheMapEntry)cctx.cache().peekEx(key); + + sb.append(U.nl()).append(" Key [key=").append(key).append(", entry=").append(e).append("]"); + } + } + + /** {@inheritDoc} */ + @Override public Object mergeKey() { + return new T2<>(getClass(), cacheId); + } + + /** {@inheritDoc} */ + @Override public void merge(DiagnosticBaseClosure other) { + TxEntriesInfoClosure other0 = (TxEntriesInfoClosure)other; + + assert other0 != null && cacheId == other0.cacheId : other; + + this.keys.addAll(other0.keys); + } + } + + /** + * + */ + public final static class ExchangeInfoClosure extends DiagnosticBaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param topVer Exchange version. + */ + ExchangeInfoClosure(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public void apply(StringBuilder sb, GridKernalContext ctx) { + sb.append(U.nl()); + + List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures(); + + for (GridDhtPartitionsExchangeFuture fut : futs) { + if (topVer.equals(fut.topologyVersion())) { + sb.append("Exchange future: ").append(fut); + + return; + } + } + + sb.append("Failed to find exchange future: ").append(topVer); + } + + /** {@inheritDoc} */ + @Override public Object mergeKey() { + return new T2<>(getClass(), topVer); + } + } + + /** + * + */ + public final static class TxInfoClosure extends DiagnosticBaseClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridCacheVersion dhtVer; + + /** */ + private final GridCacheVersion nearVer; + + /** + * @param dhtVer Tx dht version. + * @param nearVer Tx near version. + */ + TxInfoClosure(GridCacheVersion dhtVer, GridCacheVersion nearVer) { + this.dhtVer = dhtVer; + this.nearVer = nearVer; + } + + /** {@inheritDoc} */ + @Override public void apply(StringBuilder sb, GridKernalContext ctx) { + sb.append(U.nl()) + .append("Related transactions [dhtVer=").append(dhtVer) + .append(", nearVer=").append(nearVer).append("]: "); + + boolean found = false; + + for (IgniteInternalTx tx : ctx.cache().context().tm().activeTransactions()) { + if (dhtVer.equals(tx.xidVersion()) || nearVer.equals(tx.nearXidVersion())) { + sb.append(U.nl()) + .append(" [ver=").append(tx.xidVersion()) + .append(", nearVer=").append(tx.nearXidVersion()) + .append(", topVer=").append(tx.topologyVersion()) + .append(", state=").append(tx.state()) + .append(", fullTx=").append(tx).append(']'); + + found = true; + } + } + + if (!found) + sb.append(U.nl()).append("Failed to find related transactions."); + } + + /** {@inheritDoc} */ + @Override public Object mergeKey() { + return new T3<>(getClass(), nearVer, dhtVer); + } + } + + /** + * + * @param sb String builder. + * @param ctx Context. + */ + static void dumpNodeBasicInfo(StringBuilder sb, GridKernalContext ctx) { + sb.append("General node info [id=").append(ctx.localNodeId()) + .append(", client=").append(ctx.clientNode()) + .append(", discoTopVer=").append(ctx.discovery().topologyVersionEx()) + .append(", time=").append(formatTime(U.currentTimeMillis())).append(']'); + } + + /** + * @param sb String builder. + * @param ctx Context. + */ + static void dumpExchangeInfo(StringBuilder sb, GridKernalContext ctx) { + GridCachePartitionExchangeManager exchMgr = ctx.cache().context().exchange(); + GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture(); + + sb.append("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion()).append(']').append(U.nl()) + .append("Last initialized exchange future: ").append(fut); + } + + /** + * @param ctx Context. + * @param nodeId Target node ID. + * @return Communication information future. + */ + public static IgniteInternalFuture<String> dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) { + if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi) + return ((TcpCommunicationSpi) ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId); + else + return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi()); + } + + /** + * @param time Time. + * @return Time string. + */ + private static String formatTime(long time) { + return dateFormat.get().format(new Date(time)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDiagnosticMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java new file mode 100644 index 0000000..b55199a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.IgniteDiagnosticMessage.DiagnosticBaseClosure; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.ExchangeInfoClosure; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.TxEntriesInfoClosure; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.TxInfoClosure; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpCommunicationInfo; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpExchangeInfo; +import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpNodeBasicInfo; + +/** + * Groups diagnostic closures by node/closure type. + */ +public class IgniteDiagnosticPrepareContext { + /** */ + private final UUID locNodeId; + + /** */ + private final Map<UUID, CompoundInfoClosure> cls = new HashMap<>(); + + /** + * @param nodeId Local node ID. + */ + public IgniteDiagnosticPrepareContext(UUID nodeId) { + locNodeId = nodeId; + } + + /** + * @param nodeId Remote node ID. + * @param topVer Topology version. + * @param msg Initial message. + */ + public void exchangeInfo(UUID nodeId, AffinityTopologyVersion topVer, String msg) { + closure(nodeId).add(msg, new ExchangeInfoClosure(topVer)); + } + + /** + * @param nodeId Remote node ID. + * @param cacheId Cache ID. + * @param keys Entry keys. + * @param msg Initial message. + */ + public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) { + closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys)); + } + + /** + * @param nodeId Remote node ID. + * @param dhtVer Tx dht version. + * @param nearVer Tx near version. + * @param msg Initial message. + */ + public void remoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion nearVer, String msg) { + closure(nodeId).add(msg, new TxInfoClosure(dhtVer, nearVer)); + } + + /** + * @param nodeId Remote node ID. + * @param msg Initial message. + */ + public void basicInfo(UUID nodeId, String msg) { + closure(nodeId).add(msg, null); + } + + /** + * @param nodeId Remote node ID. + * @return Compound closure + */ + private CompoundInfoClosure closure(UUID nodeId) { + CompoundInfoClosure cl = cls.get(nodeId); + + if (cl == null) + cls.put(nodeId, cl = new CompoundInfoClosure(locNodeId)); + + return cl; + } + + /** + * @return {@code True} if there are no added closures. + */ + public boolean empty() { + return cls.isEmpty(); + } + + /** + * @param ctx Grid context. + * @param lsnr Optional listener (used in test). + */ + public void send(GridKernalContext ctx, @Nullable IgniteInClosure<IgniteInternalFuture<String>> lsnr) { + for (Map.Entry<UUID, CompoundInfoClosure> entry : cls.entrySet()) { + UUID rmtNodeId = entry.getKey(); + + CompoundInfoClosure c = entry.getValue(); + + IgniteInternalFuture<String> fut = + ctx.cluster().requestDiagnosticInfo(rmtNodeId, c, c.message()); + + if (lsnr != null) + fut.listen(lsnr); + + listenAndLog(ctx.cluster().diagnosticLog(), fut); + } + } + + /** + * @param log Logger. + * @param fut Future. + */ + private void listenAndLog(final IgniteLogger log, IgniteInternalFuture<String> fut) { + fut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> fut) { + synchronized (IgniteDiagnosticPrepareContext.class) { + try { + log.info(fut.get()); + } + catch (Exception e) { + U.error(log, "Failed to dump diagnostic info: " + e, e); + } + } + } + }); + } + + /** + * + */ + private final static class CompoundInfoClosure implements IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> { + /** */ + private static final long serialVersionUID = 0L; + + /** ID of node sent closure. */ + protected final UUID nodeId; + + /** Closures to send on remote node. */ + private Map<Object, IgniteDiagnosticMessage.DiagnosticBaseClosure> cls = new LinkedHashMap<>(); + + /** Local message related to remote closures. */ + private transient Map<Object, List<String>> msgs = new LinkedHashMap<>(); + + /** + * @param nodeId Node sent closure. + */ + CompoundInfoClosure(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public final IgniteDiagnosticInfo apply(GridKernalContext ctx) { + try { + IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx, nodeId); + + StringBuilder sb = new StringBuilder(); + + dumpNodeBasicInfo(sb, ctx); + + sb.append(U.nl()); + + dumpExchangeInfo(sb, ctx); + + sb.append(U.nl()); + + sb.append(commInfo.get(10_000)); + + moreInfo(sb, ctx); + + return new IgniteDiagnosticInfo(sb.toString()); + } + catch (Exception e) { + ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message closure: " + e, e); + + return new IgniteDiagnosticInfo("Failed to execute diagnostic message closure: " + e); + } + } + + /** + * @param sb String builder. + * @param ctx Grid context. + */ + private void moreInfo(StringBuilder sb, GridKernalContext ctx) { + for (DiagnosticBaseClosure c : cls.values()) { + try { + c.apply(sb, ctx); + } + catch (Exception e) { + ctx.cluster().diagnosticLog().error( + "Failed to populate diagnostic with additional information: " + e, e); + + sb.append(U.nl()).append("Failed to populate diagnostic with additional information: ").append(e); + } + } + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Initial message. + */ + public String message() { + StringBuilder sb = new StringBuilder(); + + for (List<String> msgs0 : msgs.values()) { + for (String msg : msgs0) { + if (sb.length() > 0) + sb.append('\n'); + + sb.append(msg); + } + } + + return sb.toString(); + } + + /** + * @param msg Message. + * @param c Closure or {@code null} if only basic info is needed. + */ + public void add(String msg, @Nullable DiagnosticBaseClosure c) { + Object key = c != null ? c.mergeKey() : getClass(); + + List<String> msgs0 = msgs.get(key); + + if (msgs0 == null) { + msgs0 = new ArrayList<>(); + + msgs.put(key, msgs0); + } + + msgs0.add(msg); + + if (c != null) { + DiagnosticBaseClosure c0 = cls.get(c.mergeKey()); + + if (c0 == null) + cls.put(c.mergeKey(), c); + else + c0.merge(c); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 6d05147..fec1892 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -950,7 +950,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start platform plugins. if (ctx.config().getPlatformConfiguration() != null) - startProcessor(new PlatformPluginProcessor(ctx));fillNodeAttributes(clusterProc.updateNotifierEnabled());} + startProcessor(new PlatformPluginProcessor(ctx)); + + ctx.cluster().initDiagnosticListeners(); + + fillNodeAttributes(clusterProc.updateNotifierEnabled()); + } catch (Throwable e) { U.error( log, "Exception during start processors, node will be stopped and close connections", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 1efc4aa..81692da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -2290,7 +2290,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * Dumps SPI stats to logs in case TcpCommunicationSpi is used, no-op otherwise. + * Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise. */ public void dumpStats() { CommunicationSpi spi = getSpi(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 753d8af..12f160b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest; import org.apache.ignite.internal.GridJobSiblingsResponse; import org.apache.ignite.internal.GridTaskCancelRequest; import org.apache.ignite.internal.GridTaskSessionRequest; +import org.apache.ignite.internal.IgniteDiagnosticMessage; import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; @@ -178,6 +179,11 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -55: + msg = new IgniteDiagnosticMessage(); + + break; + // -54 is reserved for SQL. case -53: @@ -875,7 +881,7 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..127] [-23..-27] [-36..-47]- this + // [-3..119] [124..127] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index cd97aea..b9d7260 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; @@ -35,6 +36,9 @@ import org.jetbrains.annotations.Nullable; * Advanced parent adapter for all processor. */ public abstract class GridProcessorAdapter implements GridProcessor { + /** */ + private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic"; + /** Kernal context. */ @GridToStringExclude protected final GridKernalContext ctx; @@ -43,6 +47,10 @@ public abstract class GridProcessorAdapter implements GridProcessor { @GridToStringExclude protected final IgniteLogger log; + /** Diagnostic logger. */ + @GridToStringExclude + protected final IgniteLogger diagnosticLog; + /** * @param ctx Kernal context. */ @@ -52,6 +60,8 @@ public abstract class GridProcessorAdapter implements GridProcessor { this.ctx = ctx; log = ctx.log(getClass()); + + diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a251047..946d256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -104,6 +106,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message ID generator. */ private static final AtomicLong idGen = new AtomicLong(); + /** */ + private static final int MAX_STORED_PENDING_MESSAGES = 100; + /** Delay in milliseconds between retries. */ private long retryDelay; @@ -126,6 +131,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Deployment enabled. */ private boolean depEnabled; + /** */ + private final List<GridCacheMessage> pendingMsgs = new ArrayList<>(MAX_STORED_PENDING_MESSAGES); + + /** + * + */ + public void dumpPendingMessages() { + synchronized (pendingMsgs) { + if (pendingMsgs.isEmpty()) + return; + + diagnosticLog.info("Pending cache messages waiting for exchange [" + + "readyVer=" + cctx.exchange().readyAffinityVersion() + + ", discoVer=" + cctx.discovery().topologyVersion() + ']'); + + for (GridCacheMessage msg : pendingMsgs) + diagnosticLog.info("Message [waitVer=" + msg.topologyVersion() + ", msg=" + msg + ']'); + } + } + /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { @Override public void onMessage(final UUID nodeId, final Object msg) { @@ -214,6 +239,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } if (fut != null && !fut.isDone()) { + synchronized (pendingMsgs) { + if (pendingMsgs.size() < MAX_STORED_PENDING_MESSAGES) + pendingMsgs.add(cacheMsg); + } + Thread curThread = Thread.currentThread(); final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1; @@ -222,6 +252,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override public void apply(IgniteInternalFuture<?> t) { Runnable c = new Runnable() { @Override public void run() { + synchronized (pendingMsgs) { + pendingMsgs.remove(cacheMsg); + } + IgniteLogger log = cacheMsg.messageLogger(cctx); if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2be4d0f..fdf8a2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -49,6 +49,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteDiagnosticAware; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -101,6 +103,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getLong; @@ -110,7 +113,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; -import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT; /** @@ -1364,47 +1367,63 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param exchTopVer Optional current exchange topology version. + * @param exchFut Optional current exchange future. * @throws Exception If failed. */ - public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) throws Exception { - U.warn(log, "Ready affinity version: " + readyTopVer.get()); + public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception { + AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.topologyVersion() : null; - U.warn(log, "Last exchange future: " + lastInitializedFut); + U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get()); + + U.warn(diagnosticLog, "Last exchange future: " + lastInitializedFut); exchWorker.dumpExchangeDebugInfo(); if (!readyFuts.isEmpty()) { - U.warn(log, "Pending affinity ready futures:"); + U.warn(diagnosticLog, "Pending affinity ready futures:"); for (AffinityReadyFuture fut : readyFuts.values()) - U.warn(log, ">>> " + fut); + U.warn(diagnosticLog, ">>> " + fut); } + IgniteDiagnosticPrepareContext diagCtx = cctx.kernalContext().cluster().diagnosticEnabled() ? + new IgniteDiagnosticPrepareContext(cctx.localNodeId()) : null; + + if (diagCtx != null && exchFut != null) + exchFut.addDiagnosticRequest(diagCtx); + ExchangeFutureSet exchFuts = this.exchFuts; if (exchFuts != null) { - U.warn(log, "Last 10 exchange futures (total: " + exchFuts.size() + "):"); + U.warn(diagnosticLog, "Last 10 exchange futures (total: " + exchFuts.size() + "):"); int cnt = 0; for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) { - U.warn(log, ">>> " + fut); + U.warn(diagnosticLog, ">>> " + fut.shortInfo()); if (++cnt == 10) break; } } - dumpPendingObjects(exchTopVer); + dumpPendingObjects(exchTopVer, diagCtx); for (CacheGroupContext grp : cctx.cache().cacheGroups()) grp.preloader().dumpDebugInfo(); cctx.affinity().dumpDebugInfo(); - // Dump IO manager statistics. - cctx.gridIO().dumpStats(); + cctx.io().dumpPendingMessages(); + + if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false)) + cctx.gridIO().dumpStats(); + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) + U.dumpThreads(diagnosticLog); + + if (diagCtx != null) + diagCtx.send(cctx.kernalContext(), null); } /** @@ -1420,12 +1439,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheMvccManager mvcc = cctx.mvcc(); + final IgniteDiagnosticPrepareContext diagCtx = cctx.kernalContext().cluster().diagnosticEnabled() ? + new IgniteDiagnosticPrepareContext(cctx.localNodeId()) : null; + if (tm != null) { for (IgniteInternalTx tx : tm.activeTransactions()) { if (curTime - tx.startTime() > timeout) { found = true; - U.warn(log, "Found long running transaction [startTime=" + formatTime(tx.startTime()) + + U.warn(diagnosticLog, "Found long running transaction [startTime=" + formatTime(tx.startTime()) + ", curTime=" + formatTime(curTime) + ", tx=" + tx + ']'); } } @@ -1436,8 +1458,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (curTime - fut.startTime() > timeout) { found = true; - U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + + U.warn(diagnosticLog, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + + if (diagCtx != null && fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).addDiagnosticRequest(diagCtx); } } @@ -1445,12 +1470,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (curTime - fut.startTime() > timeout) { found = true; - U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + + U.warn(diagnosticLog, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + + if (diagCtx != null && fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).addDiagnosticRequest(diagCtx); } } } + if (diagCtx != null && !diagCtx.empty()) { + try { + cctx.kernalContext().closure().runLocal(new Runnable() { + @Override public void run() { + diagCtx.send(cctx.kernalContext(), null); + } + }, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(diagnosticLog, "Failed to submit diagnostic closure: " + e, e); + } + } + return found; } @@ -1472,15 +1513,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana nextLongRunningOpsDumpTime = U.currentTimeMillis() + nextDumpTimeout(longRunningOpsDumpStep++, timeout); if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) { - U.warn(log, "Found long running cache operations, dump threads."); + U.warn(diagnosticLog, "Found long running cache operations, dump threads."); - U.dumpThreads(log); + U.dumpThreads(diagnosticLog); } - U.warn(log, "Found long running cache operations, dump IO statistics."); + if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false)) { + U.warn(diagnosticLog, "Found long running cache operations, dump IO statistics."); - // Dump IO manager statistics. - cctx.gridIO().dumpStats(); + // Dump IO manager statistics. + cctx.gridIO().dumpStats(); + } } else { nextLongRunningOpsDumpTime = 0; @@ -1488,7 +1531,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } catch (Exception e) { - U.error(log, "Failed to dump debug information: " + e, e); + U.error(diagnosticLog, "Failed to dump debug information: " + e, e); } } @@ -1512,52 +1555,54 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param exchTopVer Exchange topology version. + * @param diagCtx Diagnostic request. */ - private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) { + private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer, + @Nullable IgniteDiagnosticPrepareContext diagCtx) { IgniteTxManager tm = cctx.tm(); if (tm != null) { - U.warn(log, "Pending transactions:"); + U.warn(diagnosticLog, "Pending transactions:"); for (IgniteInternalTx tx : tm.activeTransactions()) { if (exchTopVer != null) { - U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + + U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) + ", tx=" + tx + ']'); } else - U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']'); + U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']'); } } GridCacheMvccManager mvcc = cctx.mvcc(); if (mvcc != null) { - U.warn(log, "Pending explicit locks:"); + U.warn(diagnosticLog, "Pending explicit locks:"); for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) - U.warn(log, ">>> " + lockSpan); + U.warn(diagnosticLog, ">>> " + lockSpan); - U.warn(log, "Pending cache futures:"); + U.warn(diagnosticLog, "Pending cache futures:"); for (GridCacheFuture<?> fut : mvcc.activeFutures()) - U.warn(log, ">>> " + fut); + dumpDiagnosticInfo(fut, diagCtx); - U.warn(log, "Pending atomic cache futures:"); + U.warn(diagnosticLog, "Pending atomic cache futures:"); for (GridCacheFuture<?> fut : mvcc.atomicFutures()) - U.warn(log, ">>> " + fut); + dumpDiagnosticInfo(fut, diagCtx); - U.warn(log, "Pending data streamer futures:"); + U.warn(diagnosticLog, "Pending data streamer futures:"); for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) - U.warn(log, ">>> " + fut); + dumpDiagnosticInfo(fut, diagCtx); if (tm != null) { - U.warn(log, "Pending transaction deadlock detection futures:"); + U.warn(diagnosticLog, "Pending transaction deadlock detection futures:"); for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) - U.warn(log, ">>> " + fut); + dumpDiagnosticInfo(fut, diagCtx); } } @@ -1578,6 +1623,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Logs the future and add diagnostic info closure. + * + * @param fut Future. + * @param ctx Diagnostic prepare context. + */ + private void dumpDiagnosticInfo(IgniteInternalFuture<?> fut, + @Nullable IgniteDiagnosticPrepareContext ctx) { + U.warn(diagnosticLog, ">>> " + fut); + + if (ctx != null && fut instanceof IgniteDiagnosticAware) + ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx); + } + + /** * @param deque Deque to poll from. * @param time Time to wait. * @param w Worker. @@ -1698,7 +1757,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (CachePartitionExchangeWorkerTask task: futQ) { if (isExchangeTask(task)) - U.warn(log, ">>> " + task); + U.warn(log, ">>> " + ((GridDhtPartitionsExchangeFuture)task).shortInfo()); } } @@ -1797,15 +1856,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (nextDumpTime <= U.currentTimeMillis()) { try { - dumpDebugInfo(exchFut.topologyVersion()); + dumpDebugInfo(exchFut); } catch (Exception e) { U.error(log, "Failed to dump debug information: " + e, e); } - if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) - U.dumpThreads(log); - nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index 3194ac6..c98a8f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -27,12 +27,19 @@ import org.apache.ignite.lang.IgniteFuture; * Convenience adapter for cache managers. */ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManager<K, V> { + + /** */ + private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic"; + /** Context. */ protected GridCacheSharedContext<K, V> cctx; /** Logger. */ protected IgniteLogger log; + /** Diagnostic logger. */ + protected IgniteLogger diagnosticLog; + /** Starting flag. */ private final AtomicBoolean starting = new AtomicBoolean(false); @@ -50,6 +57,8 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag log = cctx.logger(getClass()); + diagnosticLog = cctx.logger(DIAGNOSTIC_LOG_CATEGORY); + start0(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 75f8366..609e4cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -36,6 +36,8 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -100,7 +102,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; */ @SuppressWarnings("unchecked") public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse> - implements GridCacheMvccFuture<GridNearTxPrepareResponse> { + implements GridCacheMvccFuture<GridNearTxPrepareResponse>, IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -1567,6 +1569,33 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } /** {@inheritDoc} */ + @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext req) { + if (!isDone()) { + for (IgniteInternalFuture fut : futures()) { + if (!fut.isDone() && fut instanceof MiniFuture) { + MiniFuture f = (MiniFuture)fut; + + if (!f.node().isLocal()) { + GridCacheVersion dhtVer = tx.xidVersion(); + GridCacheVersion nearVer = tx.nearXidVersion(); + + req.remoteTxInfo(f.nodeId, dhtVer, nearVer, "GridDhtTxPrepareFuture " + + "waiting for response [node=" + f.nodeId + + ", topVer=" + tx.topologyVersion() + + ", dhtVer=" + dhtVer + + ", nearVer=" + nearVer + + ", futId=" + futId + + ", miniId=" + f.futId + + ", tx=" + tx + ']'); + + return; + } + } + } + } + } + + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 3f612f7..d3bfb3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; +import org.apache.ignite.internal.IgniteDiagnosticMessage; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -63,7 +66,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * */ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>, - CacheGetFuture { + CacheGetFuture, IgniteDiagnosticAware { /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); @@ -767,6 +770,26 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec } /** {@inheritDoc} */ + @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) { + if (!isDone()) { + UUID nodeId; + AffinityTopologyVersion topVer; + + synchronized (this) { + nodeId = node != null ? node.id() : null; + topVer = this.topVer; + } + + if (nodeId != null) + ctx.basicInfo(nodeId, "GridPartitionedSingleGetFuture waiting for " + + "response [node=" + nodeId + + ", key=" + key + + ", futId=" + futId + + ", topVer=" + topVer + ']'); + } + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridPartitionedSingleGetFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 4442b3a..b88eb47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteDiagnosticAware; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; @@ -83,7 +85,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; * Colocated cache lock future. */ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture<Boolean> - implements GridCacheMvccFuture<Boolean> { + implements GridCacheMvccFuture<Boolean>, IgniteDiagnosticAware { /** */ private static final long serialVersionUID = 0L; @@ -596,13 +598,51 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF } /** {@inheritDoc} */ + @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) { + if (!isDone()) { + for (IgniteInternalFuture fut : futures()) { + if (!fut.isDone() && isMini(fut)) { + MiniFuture m = (MiniFuture)fut; + + AffinityTopologyVersion topVer = null; + UUID rmtNodeId = null; + + synchronized (m) { + if (!m.rcvRes && !m.node.isLocal()) { + rmtNodeId = m.node.id(); + + topVer = this.topVer; + } + } + + if (rmtNodeId != null) { + ctx.txKeyInfo(rmtNodeId, cctx.cacheId(), m.keys, + "GridDhtColocatedLockFuture waiting for response [node=" + rmtNodeId + + ", cache=" + cctx.name() + + ", miniId=" + m.futId + + ", topVer=" + topVer + + ", keys=" + m.keys + ']'); + + return; + } + } + } + } + } + + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { if (isMini(f)) { MiniFuture m = (MiniFuture)f; - return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]"; + synchronized (m) { + return "[node=" + m.node().id() + + ", rcvRes=" + m.rcvRes + + ", loc=" + m.node().isLocal() + + ", done=" + f.isDone() + "]"; + } } else return "[loc=true, done=" + f.isDone() + "]"; @@ -610,6 +650,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF }); return S.toString(GridDhtColocatedLockFuture.class, this, + "topVer", topVer, "innerFuts", futs, "inTx", inTx(), "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 26406c6..2271a85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -42,6 +42,8 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteDiagnosticAware; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -108,7 +110,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS */ @SuppressWarnings({"TypeMayBeWeakened", "unchecked"}) public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> - implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask { + implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware { /** Dummy flag. */ private final boolean dummy; @@ -1023,14 +1025,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: "); try { - cctx.exchange().dumpDebugInfo(topologyVersion()); + cctx.exchange().dumpDebugInfo(this); } catch (Exception e) { U.error(log, "Failed to dump debug information: " + e, e); } - - if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) - U.dumpThreads(log); } /** @@ -2157,19 +2156,52 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** {@inheritDoc} */ + @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) { + if (!isDone()) { + ClusterNode crd; + Set<UUID> remaining; + + synchronized (this) { + crd = this.crd; + remaining = new HashSet<>(this.remaining); + } + + if (crd != null) { + if (!crd.isLocal()) { + diagCtx.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting for coordinator " + + "response [crd=" + crd.id() + ", topVer=" + topologyVersion() + ']'); + } + else if (!remaining.isEmpty()){ + UUID nodeId = remaining.iterator().next(); + + diagCtx.exchangeInfo(crd.id(), topologyVersion(), "Exchange future on coordinator waiting for " + + "server response [node=" + nodeId + ", topVer=" + topologyVersion() + ']'); + } + } + } + } + + /** + * @return Short information string. + */ + public String shortInfo() { + return "GridDhtPartitionsExchangeFuture [topVer=" + topologyVersion() + + ", evt=" + (discoEvt != null ? discoEvt.type() : -1) + + ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) + + ", done=" + isDone() + ']'; + } + + /** {@inheritDoc} */ @Override public String toString() { Set<UUID> remaining; - List<ClusterNode> srvNodes; synchronized (this) { remaining = new HashSet<>(this.remaining); - srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null; } return S.toString(GridDhtPartitionsExchangeFuture.class, this, "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(), "remaining", remaining, - "srvNodes", srvNodes, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 542ee43..ed83650 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -24,25 +24,47 @@ import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteDiagnosticInfo; +import org.apache.ignite.internal.IgniteDiagnosticMessage; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgniteProperties; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.IgniteClusterImpl; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.util.GridTimerTask; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED; +import static org.apache.ignite.IgniteSystemProperties.getBoolean; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC; +import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC; import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR; /** @@ -72,6 +94,13 @@ public class ClusterProcessor extends GridProcessorAdapter { @GridToStringExclude private GridUpdateNotifier verChecker; + /** */ + private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap = + new AtomicReference<>(); + + /** */ + private final AtomicLong diagFutId = new AtomicLong(); + /** * @param ctx Kernal context. */ @@ -82,6 +111,135 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * @return Diagnostic flag. + */ + public boolean diagnosticEnabled() { + return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true); + } + + /** */ + private final JdkMarshaller marsh = new JdkMarshaller(); + + /** + * @throws IgniteCheckedException If failed. + */ + public void initDiagnosticListeners() throws IgniteCheckedException { + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get(); + + if (futs != null) { + for (InternalDiagnosticFuture fut : futs.values()) { + if (fut.nodeId.equals(nodeId)) + fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId)); + } + } + } + }, + EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof IgniteDiagnosticMessage) { + IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg; + + if (msg0.request()) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + if (diagnosticLog.isDebugEnabled()) { + diagnosticLog.debug("Skip diagnostic request, sender node left " + + "[node=" + nodeId + ", msg=" + msg + ']'); + } + + return; + } + + byte[] diagRes; + + IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c; + + try { + c = msg0.unmarshal(marsh); + + diagRes = marsh.marshal(c.apply(ctx)); + } + catch (Exception e) { + U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e); + + try { + IgniteDiagnosticInfo errInfo = + new IgniteDiagnosticInfo("Failed to run diagnostic closure: " + e); + + diagRes = marsh.marshal(errInfo); + } + catch (Exception e0) { + U.error(diagnosticLog, "Failed to marshal diagnostic closure result: " + e, e); + + diagRes = null; + } + } + + IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(diagRes, msg0.futureId()); + + try { + ctx.io().sendToGridTopic(node, TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + if (diagnosticLog.isDebugEnabled()) { + diagnosticLog.debug("Failed to send diagnostic response, node left " + + "[node=" + nodeId + ", msg=" + msg + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e); + } + } + else { + InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId()); + + if (fut != null) { + IgniteDiagnosticInfo res; + + try { + res = msg0.unmarshal(marsh); + + if (res == null) + res = new IgniteDiagnosticInfo("Remote node failed to marshal response."); + } + catch (Exception e) { + U.error(diagnosticLog, "Failed to unmarshal diagnostic response: " + e, e); + + res = new IgniteDiagnosticInfo("Failed to unmarshal diagnostic response: " + e); + } + + fut.onResponse(res); + } + else + U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']'); + } + } + else + U.warn(diagnosticLog, "Received unexpected message: " + msg); + } + }); + } + + /** + * @return Logger for diagnostic category. + */ + public IgniteLogger diagnosticLog() { + return diagnosticLog; + } + + /** * @return Cluster. */ public IgniteClusterImpl get() { @@ -188,6 +346,7 @@ public class ClusterProcessor extends GridProcessorAdapter { if (verChecker != null) verChecker.stop(); + ctx.io().removeMessageListener(TOPIC_INTERNAL_DIAGNOSTIC); } /** @@ -212,6 +371,103 @@ public class ClusterProcessor extends GridProcessorAdapter { } /** + * Sends diagnostic message closure to remote node. When response received dumps + * remote message and local communication info about connection(s) with remote node. + * + * @param nodeId Target node ID. + * @param c Closure to send. + * @param baseMsg Local message to log. + * @return Message future. + */ + public IgniteInternalFuture<String> requestDiagnosticInfo(final UUID nodeId, + IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c, + final String baseMsg) { + final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>(); + + final IgniteInternalFuture<IgniteDiagnosticInfo> rmtFut = sendDiagnosticMessage(nodeId, c); + + rmtFut.listen(new CI1<IgniteInternalFuture<IgniteDiagnosticInfo>>() { + @Override public void apply(IgniteInternalFuture<IgniteDiagnosticInfo> fut) { + String rmtMsg; + + try { + rmtMsg = fut.get().message(); + } + catch (Exception e) { + rmtMsg = "Diagnostic processing error: " + e; + } + + final String rmtMsg0 = rmtMsg; + + IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId); + + locFut.listen(new CI1<IgniteInternalFuture<String>>() { + @Override public void apply(IgniteInternalFuture<String> locFut) { + String locMsg; + + try { + locMsg = locFut.get(); + } + catch (Exception e) { + locMsg = "Failed to get info for local node: " + e; + } + + String msg = baseMsg + U.nl() + + "Remote node information:" + U.nl() + rmtMsg0 + + U.nl() + "Local communication statistics:" + U.nl() + + locMsg; + + infoFut.onDone(msg); + } + }); + } + }); + + return infoFut; + } + + /** + * @param nodeId Target node ID. + * @param c Message closure. + * @return Message future. + */ + private IgniteInternalFuture<IgniteDiagnosticInfo> sendDiagnosticMessage(UUID nodeId, + IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c) { + try { + IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(marsh, + c, + diagFutId.getAndIncrement()); + + InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId()); + + diagnosticFuturesMap().put(msg.futureId(), fut); + + ctx.io().sendToGridTopic(nodeId, TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL); + + return fut; + } + catch (Exception e) { + U.error(diagnosticLog, "Failed to send diagnostic message: " + e); + + return new GridFinishedFuture<>(new IgniteDiagnosticInfo("Failed to send diagnostic message: " + e)); + } + } + + /** + * @return Diagnostic messages futures map. + */ + private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() { + ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get(); + + if (map == null) { + if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>())) + map = diagnosticFutMap.get(); + } + + return map; + } + + /** * Update notifier timer task. */ private static class UpdateNotifierTimerTask extends GridTimerTask { @@ -279,4 +535,47 @@ public class ClusterProcessor extends GridProcessorAdapter { } } } + + /** + * + */ + class InternalDiagnosticFuture extends GridFutureAdapter<IgniteDiagnosticInfo> { + /** */ + private final long id; + + /** */ + private final UUID nodeId; + + /** + * @param nodeId Target node ID. + * @param id Future ID. + */ + InternalDiagnosticFuture(UUID nodeId, long id) { + this.nodeId = nodeId; + this.id = id; + } + + /** + * @param res Response. + */ + public void onResponse(IgniteDiagnosticInfo res) { + onDone(res); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable IgniteDiagnosticInfo res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + diagnosticFuturesMap().remove(id); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(InternalDiagnosticFuture.class, this); + } + } } \ No newline at end of file
