IGNITE-950 - Fixing context for async ops. Debug is enabled.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f261704c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f261704c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f261704c Branch: refs/heads/ignite-1753-1282 Commit: f261704c3b83b97d95b2dabecfaf35096a6a4f9f Parents: ab32d0a Author: Alexey Goncharuk <[email protected]> Authored: Tue Nov 3 18:37:55 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Nov 3 18:37:55 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 82 ++++++++++++-------- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../near/GridNearTransactionalCache.java | 2 +- 3 files changed, 50 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9a61bdb..07a4ac7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1823,7 +1823,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) { return tx.getAllAsync(ctx, keys, deserializePortable, skipVals, false, !readThrough); } - }); + }, ctx.operationContextPerCall()); } } @@ -3987,11 +3987,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + if (tx == null || tx.implicit()) { boolean skipStore = ctx.skipStore(); // Save value of thread-local flag. - CacheOperationContext opCtx = ctx.operationContextPerCall(); - int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES; if (retries == 1) { @@ -4005,10 +4005,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V !skipStore, 0); - return asyncOp(tx, op); + return asyncOp(tx, op, opCtx); } else { - AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, skipStore, retries); + AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx); fut.execute(); @@ -4016,7 +4016,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } else - return asyncOp(tx, op); + return asyncOp(tx, op, opCtx); } /** @@ -4026,7 +4026,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Future. */ @SuppressWarnings("unchecked") - protected <T> IgniteInternalFuture<T> asyncOp(IgniteTxLocalAdapter tx, final AsyncOp<T> op) { + protected <T> IgniteInternalFuture<T> asyncOp( + IgniteTxLocalAdapter tx, + final AsyncOp<T> op, + final CacheOperationContext opCtx + ) { IgniteInternalFuture<T> fail = asyncOpAcquire(); if (fail != null) @@ -4049,24 +4053,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>( new IgniteCheckedException("Operation has been cancelled (node is stopping).")); - return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() { - @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { - try { - return tFut.get(); - } - catch (IgniteTxRollbackCheckedException e) { - throw e; - } - catch (IgniteCheckedException e1) { - tx0.rollbackAsync(); + ctx.operationContextPerCall(opCtx); - throw e1; - } - finally { - ctx.shared().txContextReset(); + try { + return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() { + @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { + try { + return tFut.get(); + } + catch (IgniteTxRollbackCheckedException e) { + throw e; + } + catch (IgniteCheckedException e1) { + tx0.rollbackAsync(); + + throw e1; + } + finally { + ctx.shared().txContextReset(); + } } - } - }); + }); + } + finally { + ctx.operationContextPerCall(null); + } } }); @@ -4631,28 +4642,31 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private AsyncOp<T> op; /** */ - private boolean skipStore; - - /** */ private int retries; /** */ private IgniteTxLocalAdapter tx; + /** */ + private CacheOperationContext opCtx; + /** * @param op Operation. - * @param skipStore Skip store flag. * @param retries Number of retries. + * @param opCtx Operation context per call to save. */ - public AsyncOpRetryFuture(AsyncOp<T> op, - boolean skipStore, - int retries) { + public AsyncOpRetryFuture( + AsyncOp<T> op, + int retries, + CacheOperationContext opCtx + ) { assert retries > 1 : retries; + tx = null; + this.op = op; - this.tx = null; - this.skipStore = skipStore; this.retries = retries; + this.opCtx = opCtx; } /** @@ -4666,10 +4680,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V OPTIMISTIC, READ_COMMITTED, ctx.kernalContext().config().getTransactionConfiguration().getDefaultTxTimeout(), - !skipStore, + !opCtx.skipStore(), 0); - IgniteInternalFuture<T> fut = asyncOp(tx, op); + IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx); fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() { @Override public void apply(IgniteInternalFuture<T> fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index efc10b2..907c68d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -216,7 +216,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte false, opCtx != null && opCtx.skipStore()); } - }); + }, opCtx); } AffinityTopologyVersion topVer = tx == null ? http://git-wip-us.apache.org/repos/asf/ignite/blob/f261704c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 8740e44..65a054c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -147,7 +147,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> false, skipStore); } - }); + }, opCtx); } subjId = ctx.subjectIdPerCall(subjId, opCtx);
