http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9291153..561c6c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -37,6 +37,7 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -148,6 +149,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** */ private GridNearAtomicCache<K, V> near; + /** Logger. */ + private IgniteLogger msgLog; + /** * Empty constructor required by {@link Externalizable}. */ @@ -160,6 +164,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public GridDhtAtomicCache(GridCacheContext<K, V> ctx) { super(ctx); + + msgLog = ctx.shared().atomicMessageLogger(); } /** @@ -168,6 +174,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public GridDhtAtomicCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); + + msgLog = ctx.shared().atomicMessageLogger(); } /** {@inheritDoc} */ @@ -1581,7 +1589,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { - U.warn(log, "Node originated update request left grid: " + nodeId); + U.warn(msgLog, "Skip near update request, node originated update request left [" + + "futId=" + req.futureVersion() + ", node=" + nodeId + ']'); return; } @@ -1596,14 +1605,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) res.nearVersion(ver); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Assigned update version [futId=" + req.futureVersion() + + ", writeVer=" + ver + ']'); + } } assert ver != null : "Got null version for update request: " + req; - if (log.isDebugEnabled()) - log.debug("Using cache version for update request on primary node [ver=" + ver + - ", req=" + req + ']'); - boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); dhtFut = createDhtFuture(ver, req, res, completionCb, false); @@ -2020,8 +2030,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { old, req.keepBinary()), ctx.unwrapBinaryIfNeeded( - updated, - req.keepBinary(), + updated, + req.keepBinary(), false)); if (val == null) @@ -2929,8 +2939,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Near atomic update request. */ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + nodeId + ']'); + } req.nodeId(ctx.localNodeId()); @@ -2943,8 +2956,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); + if (msgLog.isDebugEnabled()) + msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); @@ -2958,8 +2971,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false); } else - U.warn(log, "Failed to find near update future for update response (will ignore) " + - "[nodeId=" + nodeId + ", res=" + res + ']'); + U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + + "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']'); } /** @@ -2967,8 +2980,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Dht atomic update request. */ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing dht atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } GridCacheVersion ver = req.writeVersion(); @@ -3066,20 +3081,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) + if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { ctx.io().send(nodeId, res, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } + } else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } + // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. sendDeferredUpdateResponse(nodeId, req.futureVersion()); } } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - req.nodeId()); + U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() + + ", node=" + req.nodeId() + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + - ", req=" + req + ']', e); + U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']', e); } } @@ -3118,16 +3144,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (updateFut != null) + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() + + ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); + } + updateFut.onResult(nodeId, res); - else - U.warn(log, "Failed to find DHT update future for update response [nodeId=" + nodeId + - ", res=" + res + ']'); + } + else { + U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']'); + } } /** @@ -3136,17 +3166,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - for (GridCacheVersion ver : res.futureVersions()) { GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); - if (updateFut != null) + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic deferred update response [futId=" + ver + + ", writeVer=" + res + ", node=" + nodeId + ']'); + } + updateFut.onResult(nodeId); - else - U.warn(log, "Failed to find DHT update future for deferred update response [nodeId=" + - nodeId + ", ver=" + ver + ", res=" + res + ']'); + } + else { + U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver + + ", nodeId=" + nodeId + ", res=" + res + ']'); + } } } @@ -3157,14 +3191,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { try { ctx.io().send(nodeId, res, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) + msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']'); } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send near update reply to node because it left grid: " + - nodeId); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send near update reply (did node leave grid?) [nodeId=" + nodeId + - ", res=" + res + ']', e); + U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']', e); } } @@ -3434,24 +3473,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { ctx.io().send(nodeId, msg, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + + ", node=" + nodeId + ']'); + } } finally { ctx.kernalContext().gateway().readUnlock(); } } catch (IllegalStateException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send deferred dht update response to remote node (grid is stopping) " + - "[nodeId=" + nodeId + ", msg=" + msg + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send deferred dht update response to remote node (did node leave grid?) " + - "[nodeId=" + nodeId + ", msg=" + msg + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node left [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send deferred dht update response to remote node [nodeId=" - + nodeId + ", msg=" + msg + ']', e); + U.error(log, "Failed to send deferred DHT update response to remote node [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); } pendingResponses.remove(nodeId, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 3a7bf1c..923b220 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -83,6 +85,11 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 570f043..4e59d11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -64,7 +64,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ - protected static IgniteLogger log; + private static IgniteLogger log; + + /** Logger. */ + private static IgniteLogger msgLog; /** Cache context. */ private final GridCacheContext cctx; @@ -116,8 +119,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2<GridNearAtomicUpdateRequest, - GridNearAtomicUpdateResponse> completionCb, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, GridNearAtomicUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes @@ -130,8 +132,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.completionCb = completionCb; this.updateRes = updateRes; - if (log == null) + if (log == null) { + msgLog = cctx.shared().atomicMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); + } keys = new ArrayList<>(updateReq.keys().size()); mappings = U.newHashMap(updateReq.keys().size()); @@ -139,6 +143,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); } + /** + * @return Write version. + */ + GridCacheVersion writeVersion() { + return writeVer; + } + /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -151,10 +162,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - if (log.isDebugEnabled()) - log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); + boolean res = registerResponse(nodeId); + + if (res && msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer + + ", node=" + nodeId + ']'); + } - return registerResponse(nodeId); + return res; } /** @@ -385,20 +400,24 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (!mappings.isEmpty()) { for (GridDhtAtomicUpdateRequest req : mappings.values()) { try { - if (log.isDebugEnabled()) - log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, sent request [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send update request to backup node because it left grid: " + - req.nodeId()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); + } registerResponse(req.nodeId()); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send update request to backup node (did node leave the grid?): " - + req.nodeId(), e); + U.error(msgLog, "Failed to send request [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); registerResponse(req.nodeId()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index b1a94de..55f7560 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -715,6 +716,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 383e515..1334819 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -190,6 +191,11 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index e847c7c..eb9be4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -26,6 +26,7 @@ import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -674,6 +675,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index f47bb75..ff4008e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -435,6 +436,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 3376510..176a90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -1079,6 +1079,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param res Response. */ private void processLockResponse(UUID nodeId, GridNearLockResponse res) { + if (txLockMsgLog.isDebugEnabled()) + txLockMsgLog.debug("Received near lock response [txId=" + res.version() + ", node=" + nodeId + ']'); + assert nodeId != null; assert res != null; @@ -1087,6 +1090,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (fut != null) fut.onResult(nodeId, res); + else { + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Received near lock response for unknown future [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 3b8b9b8..f77efee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -93,6 +93,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Cache registry. */ @GridToStringExclude private final GridCacheContext<?, ?> cctx; @@ -199,8 +202,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.shared().txLockMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtColocatedLockFuture.class); + } if (timeout > 0) { timeoutObj = new LockTimeoutObject(); @@ -390,10 +395,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture MiniFuture f = (MiniFuture)fut; if (f.node().id().equals(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" + - this + ']'); - f.onResult(newTopologyException(null, nodeId)); found = true; @@ -414,33 +415,29 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture */ void onResult(UUID nodeId, GridNearLockResponse res) { if (!isDone()) { - if (log.isDebugEnabled()) - log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + - this + ']'); - MiniFuture mini = miniFuture(res.miniId()); if (mini != null) { assert mini.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - mini.onResult(res); - if (log.isDebugEnabled()) - log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); - return; } - U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + + U.warn(msgLog, "Collocated lock fut, failed to find mini future [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + nodeId + + ", res=" + res + ", fut=" + this + ']'); } - else if (log.isDebugEnabled()) - log.debug("Ignoring lock response from node (future is done) [nodeId=" + nodeId + ", res=" + res + - ", fut=" + this + ']'); + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, response for finished future [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + nodeId + ']'); + } + } } /** @@ -1046,10 +1043,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (txSync == null || txSync.isDone()) { try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, sent request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException ex) { assert fut != null; @@ -1061,10 +1061,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture txSync.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, sent request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException ex) { assert fut != null; @@ -1072,6 +1075,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture fut.onResult(ex); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, failed to send request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + + ", err=" + e + ']'); + } + onError(e); } } @@ -1404,6 +1414,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @param e Node left exception. */ void onResult(ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, mini future node left [txId=" + lockVer + + ", inTx=" + inTx() + + ", nodeId=" + node.id() + ']'); + } + if (isDone()) return; @@ -1414,9 +1430,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture rcvRes = true; } - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); - if (tx != null) tx.removeMapping(node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index bea1957..80b3768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -807,6 +807,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); dumpedObjects++; + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) + U.dumpThreads(log); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 5d3f604..1ea99c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -162,6 +162,22 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa mini.onResult(nodeId, res); } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -483,6 +499,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa else { try { cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); @@ -490,6 +511,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa fut.onNodeLeft(e, false); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + fut.onResult(e); } } @@ -696,8 +723,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param e Node failure. + * @param discoThread {@code True} if executed from discovery thread. */ void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + parent.tx.nearXidVersion() + + ", node=" + m.node().id() + ']'); + } + if (isDone()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 9adf580..5d347d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -102,11 +102,24 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (f != null) { assert f.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); - f.onResult(res); } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -260,6 +273,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else { try { cctx.io().send(node, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer)); @@ -267,6 +285,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA fut.onNodeLeft(e); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() + + ", node=" + node.id() + ", err=" + e + ']'); + } + fut.onError(e); } } @@ -365,6 +388,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA * @param e Error. */ void onNodeLeft(ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, mini future node left [txId=" + tx.nearXidVersion() + + ", nodeId=" + m.node().id() + ']'); + } + if (tx.onePhaseCommit()) { tx.markForBackupCheck(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index fe6290d..39f3ff3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -84,6 +84,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** Logger. */ private static IgniteLogger log; + /** Logger. */ + protected static IgniteLogger msgLog; + /** Context. */ private GridCacheSharedContext<K, V> cctx; @@ -124,9 +127,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu futId = IgniteUuid.randomUuid(); - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridNearTxFinishFuture.class); - CacheWriteSynchronizationMode syncMode; if (tx.explicitLock()) @@ -135,6 +135,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu syncMode = tx.syncMode(); tx.syncMode(syncMode); + + if (log == null) { + msgLog = cctx.txFinishMessageLogger(); + log = U.logger(cctx.kernalContext(), logRef, GridNearTxFinishFuture.class); + } } /** {@inheritDoc} */ @@ -162,6 +167,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu return found; } + /** + * @return Transaction. + */ + public GridNearTxLocal tx() { + return tx; + } + /** {@inheritDoc} */ @Override public boolean trackable() { return trackable; @@ -203,6 +215,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (finishFut != null) finishFut.onNearFinishResponse(res); + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -211,12 +239,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param res Result. */ public void onResult(UUID nodeId, GridDhtTxFinishResponse res) { - if (!isDone()) + if (!isDone()) { + boolean found = false; + for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (fut.getClass() == CheckBackupMiniFuture.class) { CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut; if (f.futureId().equals(res.miniId())) { + found = true; + assert f.node().id().equals(nodeId); f.onDhtFinishResponse(res); @@ -229,6 +261,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu f.onDhtFinishResponse(nodeId, false); } } + + if (!found && msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } } /** {@inheritDoc} */ @@ -458,8 +506,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu finishReq.syncCommit(true); try { - if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) + if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) { cctx.io().send(backup, finishReq, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, sent check committed request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + backup.id() + ']'); + } + } else { mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " + "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() + @@ -470,6 +525,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu mini.onNodeLeft(backupId, false); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to send check committed request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + backup.id() + + ", err=" + e + ']'); + } + mini.onDone(e); } } @@ -607,6 +669,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu try { cctx.io().send(n, req, tx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, sent request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); + } + boolean wait; if (syncMode == PRIMARY_SYNC) @@ -625,6 +693,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu fut.onNodeLeft(n.id(), false); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to send request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + // Fail the whole thing. fut.onDone(e); } @@ -770,8 +845,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** {@inheritDoc} */ boolean onNodeLeft(UUID nodeId, boolean discoThread) { if (nodeId.equals(m.node().id())) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply: " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() + + ", node=" + m.node().id() + ']'); + } if (tx.syncMode() == FULL_SYNC) { Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index fd959b4..3260d39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -74,6 +74,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** Logger. */ protected static IgniteLogger log; + /** Logger. */ + protected static IgniteLogger msgLog; + /** Context. */ protected GridCacheSharedContext<?, ?> cctx; @@ -110,8 +113,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.txFinishMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 7039399..ba30e10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -95,6 +95,15 @@ public class IgniteTxHandler { /** Logger. */ private IgniteLogger log; + /** */ + private final IgniteLogger txPrepareMsgLog; + + /** */ + private final IgniteLogger txFinishMsgLog; + + /** */ + private final IgniteLogger txRecoveryMsgLog; + /** Shared cache context. */ private GridCacheSharedContext<?, ?> ctx; @@ -105,6 +114,11 @@ public class IgniteTxHandler { */ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, final GridNearTxPrepareRequest req) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } + return prepareTx(nearNodeId, null, req); } @@ -116,6 +130,10 @@ public class IgniteTxHandler { log = ctx.logger(IgniteTxHandler.class); + txRecoveryMsgLog = ctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY); + txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); + txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); + ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); @@ -268,8 +286,11 @@ public class IgniteTxHandler { ClusterNode nearNode = ctx.node(nearNodeId); if (nearNode == null) { - if (log.isDebugEnabled()) - log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received near prepare from node that left grid (will ignore) [" + + "txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } return null; } @@ -319,9 +340,11 @@ public class IgniteTxHandler { try { if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap transaction [" + - "reqTopVer=" + req.topologyVersion() + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" + + "txId=" + req.version() + + ", node=" + nearNodeId + + ", reqTopVer=" + req.topologyVersion() + ", locTopVer=" + top.topologyVersion() + ", req=" + req + ']'); } @@ -339,15 +362,24 @@ public class IgniteTxHandler { try { ctx.io().send(nearNode, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send client tx remap response, client node failed " + - "[node=" + nearNode + ", req=" + req + ']'); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" + + "txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send client tx remap response " + - "[node=" + nearNode + ", req=" + req + ']', e); + U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " + + "[txId=" + req.version() + + ", node=" + nearNodeId + + ", req=" + req + ']', e); } return new GridFinishedFuture<>(res); @@ -494,11 +526,16 @@ public class IgniteTxHandler { * @param res Response. */ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) { + if (txPrepareMsgLog.isDebugEnabled()) + txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" + nodeId + ']'); + GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc() .<IgniteInternalTx>mvccFuture(res.version(), res.futureId()); if (fut == null) { - U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); + U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); return; } @@ -511,13 +548,19 @@ public class IgniteTxHandler { * @param res Response. */ private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse res) { + if (txFinishMsgLog.isDebugEnabled()) + txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']'); + ctx.tm().onFinishedRemote(nodeId, res.threadId()); GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for near finish response [txId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } @@ -533,11 +576,17 @@ public class IgniteTxHandler { GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to find future for dht prepare response [txId=null" + + ", dhtTxId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } + else if (txPrepareMsgLog.isDebugEnabled()) + txPrepareMsgLog.debug("Received dht prepare response [txId=" + fut.tx().nearXidVersion() + ", node=" + nodeId + ']'); fut.onResult(nodeId, res); } @@ -554,11 +603,20 @@ public class IgniteTxHandler { GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for dht finish check committed response [txId=null" + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } + else if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish check committed response [txId=" + fut.tx().nearXidVersion() + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + ']'); + } fut.onResult(nodeId, res); } @@ -566,11 +624,21 @@ public class IgniteTxHandler { GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for dht finish response [txId=null" + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res); + } return; } + else if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish response [txId=" + fut.tx().nearXidVersion() + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + ']'); + } + fut.onResult(nodeId, res); } @@ -583,6 +651,9 @@ public class IgniteTxHandler { */ @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) { + if (txFinishMsgLog.isDebugEnabled()) + txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId + ']'); + return finish(nodeId, null, req); } @@ -660,9 +731,11 @@ public class IgniteTxHandler { if (tx == null && !req.explicitLock()) { assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx; - U.warn(log, "Received finish request for completed transaction (the message may be too late " + - "and transaction could have been DGCed by now) [commit=" + req.commit() + - ", xid=" + req.version() + ']'); + U.warn(txFinishMsgLog, "Received finish request for completed transaction (the message may be too late) [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + + ", commit=" + req.commit() + ']'); // Always send finish response. GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(), @@ -670,17 +743,31 @@ public class IgniteTxHandler { try { ctx.io().send(nodeId, res, req.policy()); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Sent near finish response for completed tx [txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + ']'); + } } catch (Throwable e) { // Double-check. if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + - ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to send near finish response for completed tx, node failed [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + ']'); + } + } + else { + U.error(txFinishMsgLog, "Failed to send near finish response for completed tx, node failed [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + + ", req=" + req + + ", res=" + res + ']', e); } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " + - "res=" + res + ']', e); if (e instanceof Error) throw (Error)e; @@ -794,15 +881,17 @@ public class IgniteTxHandler { * @param req Request. */ protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + assert nodeId != null; assert req != null; assert req.transactionNodes() != null; - if (log.isDebugEnabled()) - log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() + - ", nodeId=" + nodeId + ", req=" + req + ']'); - GridDhtTxRemote dhtTx = null; GridNearTxRemote nearTx = null; @@ -862,16 +951,28 @@ public class IgniteTxHandler { try { // Reply back to sender. ctx.io().send(nodeId, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId + - ", xid=" + req.version()); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" + + "txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", err=" + e.getMessage() + ']'); } - else - U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId + - ", xid=" + req.version() + ", err=" + e.getMessage() + ']'); if (nearTx != null) nearTx.rollback(); @@ -890,20 +991,17 @@ public class IgniteTxHandler { assert nodeId != null; assert req != null; - if (log.isDebugEnabled()) - log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); - if (req.checkCommitted()) { boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version()); if (!committed || !req.syncCommit()) - sendReply(nodeId, req, committed); + sendReply(nodeId, req, committed, null); else { IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version()); fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, null); } }); } @@ -911,9 +1009,18 @@ public class IgniteTxHandler { return; } - GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); + final GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); GridNearTxRemote nearTx = ctx.tm().nearTx(req.version()); + final GridCacheVersion nearTxId = + (dhtTx != null ? dhtTx.nearXidVersion() : (nearTx != null ? nearTx.nearXidVersion() : null)); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish request [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + // Safety - local transaction will finish explicitly. if (nearTx != null && nearTx.local()) nearTx = null; @@ -929,7 +1036,7 @@ public class IgniteTxHandler { IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); - IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? + final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); if (dhtFin != null && nearFin != null) { @@ -948,15 +1055,15 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, nearTxId); } }); } else - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, nearTxId); } else - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, null); } /** @@ -1073,8 +1180,9 @@ public class IgniteTxHandler { * @param nodeId Node id that originated finish request. * @param req Request. * @param committed {@code True} if transaction committed on this node. + * @param nearTxId Near tx version. */ - protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) { + protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { if (req.replyRequired() || req.checkCommitted()) { GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); @@ -1092,20 +1200,41 @@ public class IgniteTxHandler { try { ctx.io().send(nodeId, res, req.policy()); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", checkCommitted=" + req.checkCommitted() + ']'); + } } catch (Throwable e) { // Double-check. if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.error(log, "Failed to send finish response to node [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", nodeId=" + nodeId + + ", res=" + res + ']', e); } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); if (e instanceof Error) throw (Error)e; } } + else { + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Skip send dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } } /** @@ -1363,8 +1492,10 @@ public class IgniteTxHandler { protected void processCheckPreparedTxRequest(final UUID nodeId, final GridCacheTxRecoveryRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() + + ", node=" + nodeId + ']'); + } IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) : ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); @@ -1411,23 +1542,32 @@ public class IgniteTxHandler { private void sendCheckPreparedResponse(UUID nodeId, GridCacheTxRecoveryRequest req, boolean prepared) { - GridCacheTxRecoveryResponse res = - new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared, - req.deployInfo() != null); + GridCacheTxRecoveryResponse res = new GridCacheTxRecoveryResponse(req.version(), + req.futureId(), + req.miniId(), + prepared, + req.deployInfo() != null); try { - if (log.isDebugEnabled()) - log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Sent tx recovery response [txId=" + req.nearXidVersion() + + ", node=" + nodeId + ", res=" + res + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" + - nodeId + ", res=" + res + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) + txRecoveryMsgLog.debug("Failed to send tx recovery response, node failed [" + + ", txId=" + req.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e); + U.error(txRecoveryMsgLog, "Failed to send tx recovery response [txId=" + req.nearXidVersion() + + ", node=" + nodeId + + ", req=" + req + + ", res=" + res + ']', e); } } @@ -1436,14 +1576,19 @@ public class IgniteTxHandler { * @param res Response. */ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Received tx recovery response [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Failed to find future for tx recovery response [txId=" + res.version() + + ", node=" + nodeId + ", res=" + res + ']'); + } return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad76dda5/modules/core/src/test/config/log4j-test.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml index 29ae7b6..276de8c 100644 --- a/modules/core/src/test/config/log4j-test.xml +++ b/modules/core/src/test/config/log4j-test.xml @@ -96,6 +96,12 @@ </category> --> + <!-- + <category name="org.apache.ignite.cache.msg"> + <level value="DEBUG"/> + </category> + --> + <!-- Disable all open source debugging. --> <category name="org"> <level value="INFO"/>
