Repository: ignite Updated Branches: refs/heads/ignite-3414 464cd9c26 -> 08328b8e6
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 5b09760..5422672 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -92,6 +92,15 @@ public class IgniteTxHandler { /** Logger. */ private IgniteLogger log; + /** */ + private final IgniteLogger txPrepareMsgLog; + + /** */ + private final IgniteLogger txFinishMsgLog; + + /** */ + private final IgniteLogger txRecoveryMsgLog; + /** Shared cache context. */ private GridCacheSharedContext<?, ?> ctx; @@ -102,6 +111,11 @@ public class IgniteTxHandler { */ public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, final GridNearTxPrepareRequest req) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } + return prepareTx(nearNodeId, null, req); } @@ -113,6 +127,10 @@ public class IgniteTxHandler { log = ctx.logger(IgniteTxHandler.class); + txRecoveryMsgLog = ctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY); + txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); + txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); + ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); @@ -265,8 +283,11 @@ public class IgniteTxHandler { ClusterNode nearNode = ctx.node(nearNodeId); if (nearNode == null) { - if (log.isDebugEnabled()) - log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received near prepare from node that left grid (will ignore) [" + + "txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } return null; } @@ -316,9 +337,11 @@ public class IgniteTxHandler { try { if (top != null && needRemap(req.topologyVersion(), top.topologyVersion(), req)) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap transaction [" + - "reqTopVer=" + req.topologyVersion() + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" + + "txId=" + req.version() + + ", node=" + nearNodeId + + ", reqTopVer=" + req.topologyVersion() + ", locTopVer=" + top.topologyVersion() + ", req=" + req + ']'); } @@ -336,15 +359,24 @@ public class IgniteTxHandler { try { ctx.io().send(nearNode, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send client tx remap response, client node failed " + - "[node=" + nearNode + ", req=" + req + ']'); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" + + "txId=" + req.version() + + ", node=" + nearNodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send client tx remap response " + - "[node=" + nearNode + ", req=" + req + ']', e); + U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " + + "[txId=" + req.version() + + ", node=" + nearNodeId + + ", req=" + req + ']', e); } return new GridFinishedFuture<>(res); @@ -479,11 +511,16 @@ public class IgniteTxHandler { * @param res Response. */ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) { + if (txPrepareMsgLog.isDebugEnabled()) + txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" + nodeId + ']'); + GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc() .<IgniteInternalTx>mvccFuture(res.version(), res.futureId()); if (fut == null) { - U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); + U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); return; } @@ -496,13 +533,19 @@ public class IgniteTxHandler { * @param res Response. */ private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse res) { + if (txFinishMsgLog.isDebugEnabled()) + txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']'); + ctx.tm().onFinishedRemote(nodeId, res.threadId()); GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for near finish response [txId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } @@ -518,11 +561,17 @@ public class IgniteTxHandler { GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to find future for dht prepare response [txId=null" + + ", dhtTxId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } + else if (txPrepareMsgLog.isDebugEnabled()) + txPrepareMsgLog.debug("Received dht prepare response [txId=" + fut.tx().nearXidVersion() + ", node=" + nodeId + ']'); fut.onResult(nodeId, res); } @@ -539,11 +588,20 @@ public class IgniteTxHandler { GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for dht finish check committed response [txId=null" + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res + ']'); + } return; } + else if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish check committed response [txId=" + fut.tx().nearXidVersion() + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + ']'); + } fut.onResult(nodeId, res); } @@ -551,11 +609,21 @@ public class IgniteTxHandler { GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to find future for dht finish response [txId=null" + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + + ", res=" + res); + } return; } + else if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish response [txId=" + fut.tx().nearXidVersion() + + ", dhtTxId=" + res.xid() + + ", node=" + nodeId + ']'); + } + fut.onResult(nodeId, res); } @@ -568,6 +636,9 @@ public class IgniteTxHandler { */ @Nullable public IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) { + if (txFinishMsgLog.isDebugEnabled()) + txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId + ']'); + return finish(nodeId, null, req); } @@ -645,9 +716,11 @@ public class IgniteTxHandler { if (tx == null && !req.explicitLock()) { assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx; - U.warn(log, "Received finish request for completed transaction (the message may be too late " + - "and transaction could have been DGCed by now) [commit=" + req.commit() + - ", xid=" + req.version() + ']'); + U.warn(txFinishMsgLog, "Received finish request for completed transaction (the message may be too late) [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + + ", commit=" + req.commit() + ']'); // Always send finish response. GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(), req.futureId(), @@ -655,17 +728,31 @@ public class IgniteTxHandler { try { ctx.io().send(nodeId, res, req.policy()); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Sent near finish response for completed tx [txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + ']'); + } } catch (Throwable e) { // Double-check. if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + - ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Failed to send near finish response for completed tx, node failed [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + ']'); + } + } + else { + U.error(txFinishMsgLog, "Failed to send near finish response for completed tx, node failed [" + + "txId=" + req.version() + + ", dhtTxId=" + dhtVer + + ", node=" + nodeId + + ", req=" + req + + ", res=" + res + ']', e); } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " + - "res=" + res + ']', e); if (e instanceof Error) throw (Error)e; @@ -778,15 +865,17 @@ public class IgniteTxHandler { * @param req Request. */ protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest req) { + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + assert nodeId != null; assert req != null; assert req.transactionNodes() != null; - if (log.isDebugEnabled()) - log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() + - ", nodeId=" + nodeId + ", req=" + req + ']'); - GridDhtTxRemote dhtTx = null; GridNearTxRemote nearTx = null; @@ -846,16 +935,28 @@ public class IgniteTxHandler { try { // Reply back to sender. ctx.io().send(nodeId, res, req.policy()); + + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException) { - if (log.isDebugEnabled()) - log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId + - ", xid=" + req.version()); + if (txPrepareMsgLog.isDebugEnabled()) { + txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" + + "txId=" + req.nearXidVersion() + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", err=" + e.getMessage() + ']'); } - else - U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId + - ", xid=" + req.version() + ", err=" + e.getMessage() + ']'); if (nearTx != null) nearTx.rollback(); @@ -874,18 +975,24 @@ public class IgniteTxHandler { assert nodeId != null; assert req != null; - if (log.isDebugEnabled()) - log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); - if (req.checkCommitted()) { - sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version())); + sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()), null); return; } - GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); + final GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); GridNearTxRemote nearTx = ctx.tm().nearTx(req.version()); + final GridCacheVersion nearTxId = + (dhtTx != null ? dhtTx.nearXidVersion() : (nearTx != null ? nearTx.nearXidVersion() : null)); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Received dht finish request [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + // Safety - local transaction will finish explicitly. if (nearTx != null && nearTx.local()) nearTx = null; @@ -901,7 +1008,7 @@ public class IgniteTxHandler { IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); - IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? + final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); if (dhtFin != null && nearFin != null) { @@ -920,15 +1027,15 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, nearTxId); } }); } else - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, nearTxId); } else - sendReply(nodeId, req, true); + sendReply(nodeId, req, true, null); } /** @@ -1045,8 +1152,9 @@ public class IgniteTxHandler { * @param nodeId Node id that originated finish request. * @param req Request. * @param committed {@code True} if transaction committed on this node. + * @param nearTxId Near tx version. */ - protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) { + protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { if (req.replyRequired()) { GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId()); @@ -1064,20 +1172,41 @@ public class IgniteTxHandler { try { ctx.io().send(nodeId, res, req.policy()); + + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + + ", checkCommitted=" + req.checkCommitted() + ']'); + } } catch (Throwable e) { // Double-check. if (ctx.discovery().node(nodeId) == null) { - if (log.isDebugEnabled()) - log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']'); + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } + else { + U.error(log, "Failed to send finish response to node [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", nodeId=" + nodeId + + ", res=" + res + ']', e); } - else - U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); if (e instanceof Error) throw (Error)e; } } + else { + if (txFinishMsgLog.isDebugEnabled()) { + txFinishMsgLog.debug("Skip send dht tx finish response [txId=" + nearTxId + + ", dhtTxId=" + req.version() + + ", node=" + nodeId + ']'); + } + } } /** @@ -1334,8 +1463,10 @@ public class IgniteTxHandler { protected void processCheckPreparedTxRequest(final UUID nodeId, final GridCacheTxRecoveryRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() + + ", node=" + nodeId + ']'); + } IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) : ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); @@ -1382,23 +1513,32 @@ public class IgniteTxHandler { private void sendCheckPreparedResponse(UUID nodeId, GridCacheTxRecoveryRequest req, boolean prepared) { - GridCacheTxRecoveryResponse res = - new GridCacheTxRecoveryResponse(req.version(), req.futureId(), req.miniId(), prepared, - req.deployInfo() != null); + GridCacheTxRecoveryResponse res = new GridCacheTxRecoveryResponse(req.version(), + req.futureId(), + req.miniId(), + prepared, + req.deployInfo() != null); try { - if (log.isDebugEnabled()) - log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); + + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Sent tx recovery response [txId=" + req.nearXidVersion() + + ", node=" + nodeId + ", res=" + res + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" + - nodeId + ", res=" + res + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) + txRecoveryMsgLog.debug("Failed to send tx recovery response, node failed [" + + ", txId=" + req.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e); + U.error(txRecoveryMsgLog, "Failed to send tx recovery response [txId=" + req.nearXidVersion() + + ", node=" + nodeId + + ", req=" + req + + ", res=" + res + ']', e); } } @@ -1407,14 +1547,19 @@ public class IgniteTxHandler { * @param res Response. */ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Received tx recovery response [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Received response for unknown future (will ignore): " + res); + if (txRecoveryMsgLog.isDebugEnabled()) { + txRecoveryMsgLog.debug("Failed to find future for tx recovery response [txId=" + res.version() + + ", node=" + nodeId + ", res=" + res + ']'); + } return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/test/config/log4j-test.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/log4j-test.xml b/modules/core/src/test/config/log4j-test.xml index 29ae7b6..276de8c 100644 --- a/modules/core/src/test/config/log4j-test.xml +++ b/modules/core/src/test/config/log4j-test.xml @@ -96,6 +96,12 @@ </category> --> + <!-- + <category name="org.apache.ignite.cache.msg"> + <level value="DEBUG"/> + </category> + --> + <!-- Disable all open source debugging. --> <category name="org"> <level value="INFO"/>
