ignite-3336 Added properties IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT. Refactored cache message logging. (cherry picked from commit 8ed13e8)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad76dda5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad76dda5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad76dda5 Branch: refs/heads/ignite-1232 Commit: ad76dda5216fd79c1dd54f27b0df83314227801e Parents: 6be26ad Author: sboikov <[email protected]> Authored: Wed Jul 6 10:18:26 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 6 11:18:49 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../apache/ignite/internal/IgniteKernal.java | 21 ++ .../processors/cache/GridCacheAdapter.java | 4 + .../processors/cache/GridCacheIoManager.java | 150 +++++++++- .../processors/cache/GridCacheMessage.java | 9 + .../GridCachePartitionExchangeManager.java | 103 +++++++ .../cache/GridCacheSharedContext.java | 68 +++++ .../processors/cache/GridCacheUtils.java | 21 ++ .../distributed/GridCacheTxRecoveryFuture.java | 70 ++++- .../distributed/GridCacheTxRecoveryRequest.java | 8 + .../GridCacheTxRecoveryResponse.java | 8 + .../distributed/GridDistributedLockRequest.java | 6 + .../GridDistributedLockResponse.java | 6 + .../GridDistributedTxFinishRequest.java | 7 + .../GridDistributedTxFinishResponse.java | 8 + .../GridDistributedTxPrepareRequest.java | 6 + .../GridDistributedTxPrepareResponse.java | 9 +- .../GridDistributedUnlockRequest.java | 6 + .../distributed/dht/GridDhtLockFuture.java | 59 ++-- .../dht/GridDhtTransactionalCacheAdapter.java | 77 ++++- .../distributed/dht/GridDhtTxFinishFuture.java | 92 +++++- .../cache/distributed/dht/GridDhtTxLocal.java | 27 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 124 +++++++- .../dht/atomic/GridDhtAtomicCache.java | 138 ++++++--- .../GridDhtAtomicDeferredUpdateResponse.java | 7 + .../dht/atomic/GridDhtAtomicUpdateFuture.java | 47 ++- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 6 + .../dht/atomic/GridDhtAtomicUpdateResponse.java | 6 + .../dht/atomic/GridNearAtomicUpdateRequest.java | 6 + .../atomic/GridNearAtomicUpdateResponse.java | 6 + .../dht/colocated/GridDhtColocatedCache.java | 10 + .../colocated/GridDhtColocatedLockFuture.java | 71 +++-- .../GridDhtPartitionsExchangeFuture.java | 3 + .../near/GridNearOptimisticTxPrepareFuture.java | 33 +++ .../GridNearPessimisticTxPrepareFuture.java | 34 ++- .../near/GridNearTxFinishFuture.java | 91 +++++- .../near/GridNearTxPrepareFutureAdapter.java | 7 +- .../cache/transactions/IgniteTxHandler.java | 283 ++++++++++++++----- modules/core/src/test/config/log4j-test.xml | 6 + 39 files changed, 1404 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index d3ba9d9..0c22c9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -380,6 +380,9 @@ public final class IgniteSystemProperties { /** If this property is set to {@code true} then Ignite will log thread dump in case of partition exchange timeout. */ public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = "IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT"; + /** Cache operations that take more time than value of this property will be output to log. Set to {@code 0} to disable. */ + public static final String IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT = "IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT"; + /** JDBC driver cursor remove delay. */ public static final String IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY = "IGNITE_JDBC_DRIVER_CURSOR_RMV_DELAY"; http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c839375..b85692e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -303,6 +303,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private GridTimeoutProcessor.CancelableTask metricsLogTask; + /** */ + @GridToStringExclude + private GridTimeoutProcessor.CancelableTask longOpDumpTask; + /** Indicate error on grid stop. */ @GridToStringExclude private boolean errOnStop; @@ -1097,6 +1101,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { }, metricsLogFreq, metricsLogFreq); } + final long longOpDumpTimeout = + IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, 60_000); + + if (longOpDumpTimeout > 0) { + longOpDumpTask = ctx.timeout().schedule(new Runnable() { + @Override public void run() { + GridKernalContext ctx = IgniteKernal.this.ctx; + + if (ctx != null) + ctx.cache().context().exchange().dumpLongRunningOperations(longOpDumpTimeout); + } + }, longOpDumpTimeout, longOpDumpTimeout); + } + ctx.performance().logSuggestions(log, gridName); U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}"); @@ -1911,6 +1929,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (metricsLogTask != null) metricsLogTask.close(); + if (longOpDumpTask != null) + longOpDumpTask.close(); + boolean interrupted = false; while (true) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 944a6b0..26730aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -261,6 +261,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Logger. */ protected IgniteLogger log; + /** Logger. */ + protected IgniteLogger txLockMsgLog; + /** Affinity impl. */ private Affinity<K> aff; @@ -323,6 +326,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V this.map = map; log = ctx.logger(getClass()); + txLockMsgLog = ctx.shared().txLockMessageLogger(); metrics = new CacheMetricsImpl(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index aab1bcc..488a22c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -39,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFutur import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; @@ -54,10 +56,13 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -113,7 +118,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { - @Override public void onMessage(final UUID nodeId, Object msg) { + @Override public void onMessage(final UUID nodeId, final Object msg) { if (log.isDebugEnabled()) log.debug("Received unordered cache communication message [nodeId=" + nodeId + ", locId=" + cctx.localNodeId() + ", msg=" + msg + ']'); @@ -178,9 +183,19 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); if (locAffVer.compareTo(rmtAffVer) < 0) { - if (log.isDebugEnabled()) - log.debug("Received message has higher affinity topology version [msg=" + msg + - ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + IgniteLogger log = cacheMsg.messageLogger(cctx); + + if (log.isDebugEnabled()) { + StringBuilder msg0 = new StringBuilder("Received message has higher affinity topology version ["); + + appendMessageInfo(cacheMsg, nodeId, msg0); + + msg0.append(", locTopVer=").append(locAffVer). + append(", rmtTopVer=").append(rmtAffVer). + append(']'); + + log.debug(msg0.toString()); + } fut = cctx.exchange().affinityReadyFuture(rmtAffVer); } @@ -191,6 +206,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { @Override public void apply(IgniteInternalFuture<?> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { + IgniteLogger log = cacheMsg.messageLogger(cctx); + + if (log.isDebugEnabled()) { + StringBuilder msg0 = new StringBuilder("Process cache message after wait for " + + "affinity topology version ["); + + appendMessageInfo(cacheMsg, nodeId, msg0).append(']'); + + log.debug(msg0.toString()); + } + handleMessage(nodeId, cacheMsg); } }); @@ -225,18 +251,23 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); if (c == null) { + IgniteLogger log = cacheMsg.messageLogger(cctx); + + StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) ["); + + appendMessageInfo(cacheMsg, nodeId, msg0); + + msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()). + append(", msgTopVer=").append(cacheMsg.topologyVersion()). + append(", cacheDesc=").append(cctx.cache().cacheDescriptor(cacheMsg.cacheId())). + append(']'); + if (cctx.kernalContext().isStopping()) { if (log.isDebugEnabled()) - log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg + - ", nodeId=" + nodeId + ']'); - } - else { - U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg + - ", nodeId=" + nodeId + - ", locTopVer=" + cctx.exchange().readyAffinityVersion() + - ", msgTopVer=" + cacheMsg.topologyVersion() + - ", cacheDesc=" + cctx.cache().cacheDescriptor(cacheMsg.cacheId()) + ']'); + log.debug(msg0.toString()); } + else + U.warn(log, msg0.toString()); return; } @@ -352,6 +383,99 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } + + /** + * @param cacheMsg Cache message. + * @param nodeId Node ID. + * @param builder Message builder. + * @return Message builder. + */ + private StringBuilder appendMessageInfo(GridCacheMessage cacheMsg, UUID nodeId, StringBuilder builder) { + if (txId(cacheMsg) != null) { + builder.append("txId=").append(txId(cacheMsg)). + append(", dhtTxId=").append(dhtTxId(cacheMsg)). + append(", msg=").append(cacheMsg); + } + else if (atomicFututeId(cacheMsg) != null) { + builder.append("futId=").append(atomicFututeId(cacheMsg)). + append(", writeVer=").append(atomicWriteVersion(cacheMsg)). + append(", msg=").append(cacheMsg); + } + else + builder.append("msg=").append(cacheMsg); + + builder.append(", node=").append(nodeId); + + return builder; + } + + /** + * @param cacheMsg Cache message. + * @return Transaction ID if applicable for message. + */ + @Nullable private GridCacheVersion txId(GridCacheMessage cacheMsg) { + if (cacheMsg instanceof GridDhtTxPrepareRequest) + return ((GridDhtTxPrepareRequest)cacheMsg).nearXidVersion(); + else if (cacheMsg instanceof GridNearTxPrepareRequest) + return ((GridNearTxPrepareRequest)cacheMsg).version(); + else if (cacheMsg instanceof GridNearTxPrepareResponse) + return ((GridNearTxPrepareResponse)cacheMsg).version(); + else if (cacheMsg instanceof GridNearTxFinishRequest) + return ((GridNearTxFinishRequest)cacheMsg).version(); + else if (cacheMsg instanceof GridNearTxFinishResponse) + return ((GridNearTxFinishResponse)cacheMsg).xid(); + + return null; + } + + /** + * @param cacheMsg Cache message. + * @return Transaction ID if applicable for message. + */ + @Nullable private GridCacheVersion dhtTxId(GridCacheMessage cacheMsg) { + if (cacheMsg instanceof GridDhtTxPrepareRequest) + return ((GridDhtTxPrepareRequest)cacheMsg).version(); + else if (cacheMsg instanceof GridDhtTxPrepareResponse) + return ((GridDhtTxPrepareResponse)cacheMsg).version(); + else if (cacheMsg instanceof GridDhtTxFinishRequest) + return ((GridDhtTxFinishRequest)cacheMsg).version(); + else if (cacheMsg instanceof GridDhtTxFinishResponse) + return ((GridDhtTxFinishResponse)cacheMsg).xid(); + + return null; + } + + /** + * @param cacheMsg Cache message. + * @return Atomic future ID if applicable for message. + */ + @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) { + if (cacheMsg instanceof GridNearAtomicUpdateRequest) + return ((GridNearAtomicUpdateRequest)cacheMsg).futureVersion(); + else if (cacheMsg instanceof GridNearAtomicUpdateResponse) + return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion(); + else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) + return ((GridDhtAtomicUpdateRequest)cacheMsg).futureVersion(); + else if (cacheMsg instanceof GridDhtAtomicUpdateResponse) + return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion(); + + return null; + } + + + /** + * @param cacheMsg Cache message. + * @return Atomic future ID if applicable for message. + */ + @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) { + if (cacheMsg instanceof GridNearAtomicUpdateRequest) + return ((GridNearAtomicUpdateRequest)cacheMsg).updateVersion(); + else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) + return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion(); + + return null; + } + /** * Processes failed messages. * http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 3c2ff13..f99d2cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; @@ -613,6 +614,14 @@ public abstract class GridCacheMessage implements Message { return col; } + /** + * @param ctx Context. + * @return Logger. + */ + public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.messageLogger(); + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 260a504..e6ab046 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.processors.cache; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -162,6 +165,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private volatile IgniteCheckedException stopErr; + /** */ + private int longRunningOpsDumpCnt; + + /** */ + private DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -1163,6 +1172,100 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param timeout Operation timeout. + */ + public void dumpLongRunningOperations(long timeout) { + try { + GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut; + + // If exchange is in progress it will dump all hanging operations if any. + if (lastFut != null && !lastFut.isDone()) + return; + + long curTime = U.currentTimeMillis(); + + boolean found = false; + + IgniteTxManager tm = cctx.tm(); + + if (tm != null) { + for (IgniteInternalTx tx : tm.activeTransactions()) { + if (curTime - tx.startTime() > timeout) { + found = true; + + if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + U.warn(log, "Found long running transaction [startTime=" + formatTime(tx.startTime()) + + ", curTime=" + formatTime(curTime) + ", tx=" + tx + ']'); + } + else + break; + } + } + } + + GridCacheMvccManager mvcc = cctx.mvcc(); + + if (mvcc != null) { + for (GridCacheFuture<?> fut : mvcc.activeFutures()) { + if (curTime - fut.startTime() > timeout) { + found = true; + + if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + } + else + break; + } + } + + for (GridCacheFuture<?> fut : mvcc.atomicFutures()) { + if (curTime - fut.startTime() > timeout) { + found = true; + + if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) + + ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']'); + } + else + break; + } + } + } + + if (found) { + if (longRunningOpsDumpCnt < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { + longRunningOpsDumpCnt++; + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) { + U.warn(log, "Found long running cache operations, dump threads."); + + U.dumpThreads(log); + } + + U.warn(log, "Found long running cache operations, dump IO statistics."); + + // Dump IO manager statistics. + cctx.gridIO().dumpStats(); + } + } + else + longRunningOpsDumpCnt = 0; + } + catch (Exception e) { + U.error(log, "Failed to dump debug information: " + e, e); + } + } + + /** + * @param time Time. + * @return Time string. + */ + private String formatTime(long time) { + return dateFormat.format(new Date(time)); + } + + /** * @param exchTopVer Exchange topology version. */ private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index f27d114..0cdf0a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; @@ -107,6 +108,24 @@ public class GridCacheSharedContext<K, V> { /** Indicating whether local store keeps primary only. */ private final boolean locStorePrimaryOnly = IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY); + /** */ + private final IgniteLogger msgLog; + + /** */ + private final IgniteLogger atomicMsgLog; + + /** */ + private final IgniteLogger txPrepareMsgLog; + + /** */ + private final IgniteLogger txFinishMsgLog; + + /** */ + private final IgniteLogger txLockMsgLog; + + /** */ + private final IgniteLogger txRecoveryMsgLog; + /** * @param kernalCtx Context. * @param txMgr Transaction manager. @@ -142,6 +161,55 @@ public class GridCacheSharedContext<K, V> { ctxMap = new ConcurrentHashMap<>(); locStoreCnt = new AtomicInteger(); + + msgLog = kernalCtx.log(CU.CACHE_MSG_LOG_CATEGORY); + atomicMsgLog = kernalCtx.log(CU.ATOMIC_MSG_LOG_CATEGORY); + txPrepareMsgLog = kernalCtx.log(CU.TX_MSG_PREPARE_LOG_CATEGORY); + txFinishMsgLog = kernalCtx.log(CU.TX_MSG_FINISH_LOG_CATEGORY); + txLockMsgLog = kernalCtx.log(CU.TX_MSG_LOCK_LOG_CATEGORY); + txRecoveryMsgLog = kernalCtx.log(CU.TX_MSG_RECOVERY_LOG_CATEGORY); + } + + /** + * @return Logger. + */ + public IgniteLogger messageLogger() { + return msgLog; + } + + /** + * @return Logger. + */ + public IgniteLogger atomicMessageLogger() { + return atomicMsgLog; + } + + /** + * @return Logger. + */ + public IgniteLogger txPrepareMessageLogger() { + return txPrepareMsgLog; + } + + /** + * @return Logger. + */ + public IgniteLogger txFinishMessageLogger() { + return txFinishMsgLog; + } + + /** + * @return Logger. + */ + public IgniteLogger txLockMessageLogger() { + return txLockMsgLog; + } + + /** + * @return Logger. + */ + public IgniteLogger txRecoveryMessageLogger() { + return txRecoveryMsgLog; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index feaa618..1a4ffd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -122,6 +122,27 @@ public class GridCacheUtils { /** Marshaller system cache name. */ public static final String MARSH_CACHE_NAME = "ignite-marshaller-sys-cache"; + /** */ + public static final String CACHE_MSG_LOG_CATEGORY = "org.apache.ignite.cache.msg"; + + /** */ + public static final String ATOMIC_MSG_LOG_CATEGORY = CACHE_MSG_LOG_CATEGORY + ".atomic"; + + /** */ + public static final String TX_MSG_LOG_CATEGORY = CACHE_MSG_LOG_CATEGORY + ".tx"; + + /** */ + public static final String TX_MSG_PREPARE_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".prepare"; + + /** */ + public static final String TX_MSG_FINISH_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".finish"; + + /** */ + public static final String TX_MSG_LOCK_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".lock"; + + /** */ + public static final String TX_MSG_RECOVERY_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".recovery"; + /** Default mask name. */ private static final String DEFAULT_MASK_NAME = "<default>"; http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 01673b8..f7b105e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -59,6 +59,9 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Trackable flag. */ private boolean trackable = true; @@ -103,8 +106,10 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea this.txNodes = txNodes; this.failedNodeIds = failedNodeIds; - if (log == null) + if (log == null) { + msgLog = cctx.txRecoveryMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class); + } nodes = new GridLeanMap<>(); @@ -174,11 +179,24 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea try { cctx.io().send(nearNodeId, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, sent request near tx [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearNodeId + ']'); + } } catch (ClusterTopologyCheckedException ignore) { fut.onNodeLeft(nearNodeId); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, failed to send request near tx [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearNodeId + + ", err=" + e + ']'); + } + fut.onError(e); } @@ -280,11 +298,24 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea try { cctx.io().send(id, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, sent request to backup [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + id + ']'); + } } catch (ClusterTopologyCheckedException ignored) { fut.onNodeLeft(id); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, failed to send request to backup [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + id + + ", err=" + e + ']'); + } + fut.onError(e); break; @@ -306,11 +337,24 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea try { cctx.io().send(nodeId, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, sent request to primary [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { fut.onNodeLeft(nodeId); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Recovery fut, failed to send request to primary [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", err=" + e + ']'); + } + fut.onError(e); break; @@ -354,6 +398,22 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea mini.onResult(res); } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Tx recovery fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + msgLog.debug("Tx recovery fut, response for finished future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); } } @@ -522,8 +582,12 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea * @param nodeId Failed node ID. */ private void onNodeLeft(UUID nodeId) { - if (log.isDebugEnabled()) - log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Tx recovery fut, mini future node left [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", nearTxCheck=" + nearTxCheck + ']'); + } if (nearTxCheck) { if (tx.state() == PREPARED) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java index e5787d7..6fdb30b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryRequest.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; @@ -127,6 +130,11 @@ public class GridCacheTxRecoveryRequest extends GridDistributedBaseMessage { } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txRecoveryMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java index 361d381..265d53b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryResponse.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -92,6 +95,11 @@ public class GridCacheTxRecoveryResponse extends GridDistributedBaseMessage { } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txRecoveryMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 20cd52c..9639a9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -316,6 +317,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { return timeout; } + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txLockMessageLogger(); + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index 84a4094..1763ff9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -180,6 +181,11 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { return null; } + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txLockMessageLogger(); + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index ad69d14..7254ec7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; @@ -222,6 +224,11 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txFinishMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java index 4e17e79..c08c5b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; @@ -79,6 +82,11 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txFinishMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index d6f4331..72e68db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; @@ -380,6 +381,11 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txPrepareMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index 83f3724..850c095 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -89,8 +90,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage return err != null; } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txPrepareMessageLogger(); + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java index 7854ace..df6acdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -111,6 +112,11 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.txLockMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 38a7e19..64b8745 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -87,6 +87,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Cache registry. */ @GridToStringExclude private GridCacheContext<?, ?> cctx; @@ -235,8 +238,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> entries = new ArrayList<>(cnt); pendingLocks = U.newHashSet(cnt); - if (log == null) + if (log == null) { + msgLog = cctx.shared().txLockMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class); + } if (timeout > 0) { timeoutObj = new LockTimeoutObject(); @@ -304,6 +309,13 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> return futId; } + /** + * @return Near lock version. + */ + public GridCacheVersion nearLockVersion() { + return nearLockVer; + } + /** {@inheritDoc} */ @Nullable @Override public GridCacheVersion mappedVersion() { return tx == null ? nearLockVer : null; @@ -506,27 +518,21 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> */ void onResult(UUID nodeId, GridDhtLockResponse res) { if (!isDone()) { - if (log.isDebugEnabled()) - log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - MiniFuture mini = miniFuture(res.miniId()); if (mini != null) { assert mini.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - mini.onResult(res); - if (log.isDebugEnabled()) - log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); - return; } - U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + + U.warn(msgLog, "DHT lock fut, failed to find mini future [txId=" + nearLockVer + + ", dhtTxId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + nodeId + + ", res=" + res + ", fut=" + this + ']'); } } @@ -929,18 +935,31 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> add(fut); // Append new future. - if (log.isDebugEnabled()) - log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']'); - cctx.io().send(n, req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT lock fut, sent request [txId=" + nearLockVer + + ", dhtTxId=" + lockVer + + ", inTx=" + inTx() + + ", nodeId=" + n.id() + ']'); + } } } catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) fut.onResult((ClusterTopologyCheckedException)e); - else + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT lock fut, failed to send request [txId=" + nearLockVer + + ", dhtTxId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + fut.onResult(e); + } } } } @@ -1162,8 +1181,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @param e Node failure. */ void onResult(ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT lock fut, mini future node left [txId=" + nearLockVer + + ", dhtTxId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + ']'); + } if (tx != null) tx.removeMapping(node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index d5927bf..6b437b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -376,6 +376,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param req Request. */ protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']'); + } + IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); @@ -451,10 +458,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert req != null; assert !nodeId.equals(locNodeId); - if (log.isDebugEnabled()) - log.debug("Processing dht lock request [locNodeId=" + locNodeId + ", nodeId=" + nodeId + ", req=" + req + - ']'); - int cnt = F.size(req.keys()); GridDhtLockResponse res; @@ -528,15 +531,30 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Reply back to sender. ctx.io().send(nodeId, res, ctx.ioPolicy()); + + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Sent dht lock response [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId); + U.warn(txLockMsgLog, "Failed to send dht lock response, node failed [" + + "txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']'); fail = true; releaseAll = true; } catch (IgniteCheckedException e) { - U.error(log, "Failed to send lock reply to node (lock will not be acquired): " + nodeId, e); + U.error(txLockMsgLog, "Failed to send dht lock response (lock will not be acquired) " + + "txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']', e); fail = true; } @@ -601,14 +619,18 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach assert nodeId != null; assert req != null; - if (log.isDebugEnabled()) - log.debug("Processing near lock request [locNodeId=" + locNodeId + ", nodeId=" + nodeId + ", req=" + req + - ']'); + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Received near lock request [txId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']'); + } ClusterNode nearNode = ctx.discovery().node(nodeId); if (nearNode == null) { - U.warn(log, "Received lock request from unknown node (will ignore): " + nodeId); + U.warn(txLockMsgLog, "Received near lock request from unknown node (will ignore) [txId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nodeId + ']'); return; } @@ -631,11 +653,18 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>mvccFuture(res.version(), res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txLockMsgLog.isDebugEnabled()) + txLockMsgLog.debug("Received dht lock response for unknown future [txId=null" + + ", dhtTxId=" + res.version() + + ", node=" + nodeId + ']'); return; } + else if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Received dht lock response [txId=" + fut.nearLockVersion() + + ", dhtTxId=" + res.version() + + ", node=" + nodeId + ']'); + } fut.onResult(nodeId, res); } @@ -1286,12 +1315,30 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach try { // Don't send reply message to this node or if lock was cancelled. - if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class)) + if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class)) { ctx.io().send(nearNode, res, ctx.ioPolicy()); + + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Sent near lock response [txId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nearNode.id() + ']'); + } + } + else { + if (txLockMsgLog.isDebugEnabled() && !nearNode.id().equals(ctx.nodeId())) { + txLockMsgLog.debug("Skip send near lock response [txId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nearNode.id() + + ", err=" + err + ']'); + } + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" + - U.toShortString(nearNode) + ", req=" + req + ']', e); + U.error(txLockMsgLog, "Failed to send near lock response (will rollback transaction) [" + + "txId=" + req.version() + + ", inTx=" + req.inTx() + + ", node=" + nearNode.id() + + ", res=" + res + ']', e); if (tx != null) tx.rollbackAsync(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index de04782..4ece775 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -67,6 +67,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Context. */ private GridCacheSharedContext<K, V> cctx; @@ -111,8 +114,17 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.txFinishMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtTxFinishFuture.class); + } + } + + /** + * @return Transaction. + */ + public GridDhtTxLocalAdapter tx() { + return tx; } /** {@inheritDoc} */ @@ -187,17 +199,40 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur */ public void onResult(UUID nodeId, GridDhtTxFinishResponse res) { if (!isDone()) { + boolean found = false; + for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; if (f.futureId().equals(res.miniId())) { + found = true; + assert f.node().id().equals(nodeId); f.onResult(res); } } } + + if (!found) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -320,6 +355,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur try { cctx.io().send(n, req, tx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, sent request lock tx [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + if (sync) res = true; else @@ -329,8 +370,16 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) fut.onNodeLeft((ClusterTopologyCheckedException)e); - else + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to send request lock tx [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + fut.onResult(e); + } } } @@ -406,6 +455,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur try { cctx.io().send(n, req, tx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + if (sync) res = true; else @@ -415,8 +470,16 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) fut.onNodeLeft((ClusterTopologyCheckedException)e); - else + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + fut.onResult(e); + } } } @@ -460,6 +523,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur try { cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + ']'); + } + if (sync) res = true; else @@ -469,8 +538,16 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) fut.onNodeLeft((ClusterTopologyCheckedException)e); - else + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + + ", err=" + e + ']'); + } + fut.onResult(e); + } } } } @@ -573,8 +650,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param discoThread {@code True} if executed from discovery thread. */ void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT finish fut, mini future node left [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + node().id() + ']'); + } // If node left, then there is nothing to commit on it. onDone(tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index d1f88d7..c9d4345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -672,23 +672,38 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa try { cctx.io().send(nearNodeId, res, ioPolicy()); + + if (cctx.txFinishMessageLogger().isDebugEnabled()) { + cctx.txFinishMessageLogger().debug("Sent near finish response [txId=" + nearXidVersion() + + ", dhtTxId=" + xidVersion() + + ", node=" + nearNodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Node left before sending finish response (transaction was committed) [node=" + - nearNodeId + ", res=" + res + ']'); + if (cctx.txFinishMessageLogger().isDebugEnabled()) { + cctx.txFinishMessageLogger().debug("Failed to send near finish response, node left [txId=" + nearXidVersion() + + ", dhtTxId=" + xidVersion() + + ", node=" + nearNodeId() + ']'); + } } catch (Throwable ex) { U.error(log, "Failed to send finish response to node (transaction was " + - (commit ? "committed" : "rolledback") + ") [node=" + nearNodeId + ", res=" + res + ']', ex); + (commit ? "committed" : "rolledback") + ") [txId=" + nearXidVersion() + + ", dhtTxId=" + xidVersion() + + ", node=" + nearNodeId + + ", res=" + res + ']', ex); if (ex instanceof Error) throw (Error)ex; } } else { - if (log.isDebugEnabled()) - log.debug("Will not send finish reply because sender node has not sent finish request yet: " + this); + if (cctx.txFinishMessageLogger().isDebugEnabled()) { + cctx.txFinishMessageLogger().debug("Will not send finish reply because sender node has not sent finish " + + "request yet [txId=" + nearXidVersion() + + ", dhtTxId=" + xidVersion() + + ", node=" + nearNodeId() + ']'); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 746dbb9..e9805aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -130,6 +130,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Context. */ private GridCacheSharedContext<?, ?> cctx; @@ -228,8 +231,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.nearMiniId = nearMiniId; - if (log == null) + if (log == null) { + msgLog = cctx.txPrepareMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtTxPrepareFuture.class); + } dhtMap = tx.dhtMap(); nearMap = tx.nearMap(); @@ -284,7 +289,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @return Transaction. */ - GridDhtTxLocalAdapter tx() { + public GridDhtTxLocalAdapter tx() { return tx; } @@ -479,13 +484,36 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { if (!isDone()) { + boolean found = false; + MiniFuture mini = miniFuture(res.miniId()); if (mini != null) { + found = true; + assert mini.node().id().equals(nodeId); mini.onResult(res); } + + if (!found) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -655,7 +683,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } } }; @@ -693,7 +726,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } } @@ -707,7 +745,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } finally { // Will call super.onDone(). @@ -742,10 +785,26 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (!tx.nearNodeId().equals(cctx.localNodeId())) { Throwable err = this.err; - if (err != null && err instanceof IgniteFutureCancelledException) + if (err != null && err instanceof IgniteFutureCancelledException) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, skip send response [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", err=" + err + + ", res=" + res + ']'); + } + return; + } cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res + ']'); + } } } @@ -1192,13 +1251,34 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + } } } @@ -1249,13 +1329,34 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + ']'); + } } catch (ClusterTopologyCheckedException e) { fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + ']'); + } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + + ", err=" + e + ']'); + } + } } } } @@ -1471,8 +1572,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param e Node failure. */ void onNodeLeft(ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, mini future node left [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + node().id() + ']'); + } if (tx != null) tx.removeMapping(nodeId);
