Repository: ignite Updated Branches: refs/heads/master feba95348 -> b2fb9be1d
IGNITE-4985 - Do not acquire asyncOp semaphore for retry operations Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b2fb9be1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b2fb9be1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b2fb9be1 Branch: refs/heads/master Commit: b2fb9be1d0be6ec765ce706c9264116b90b3ce3a Parents: 6e12daa Author: Alexey Goncharuk <[email protected]> Authored: Tue Apr 18 12:56:33 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Apr 18 12:57:03 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 41 ++++++++++---------- .../dht/atomic/GridDhtAtomicCache.java | 6 +-- .../dht/colocated/GridDhtColocatedCache.java | 6 +-- .../near/GridNearTransactionalCache.java | 2 +- .../local/atomic/GridLocalAtomicCache.java | 6 +-- ...ridCacheReplicatedSynchronousCommitTest.java | 2 +- 6 files changed, 31 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b9fa6c9..b38e481 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2214,7 +2214,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V recovery, needVer); } - }, ctx.operationContextPerCall()); + }, ctx.operationContextPerCall(), /*retry*/false); } } @@ -3971,14 +3971,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); return f; } IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync(); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); ctx.tm().resetContext(); @@ -4139,18 +4139,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V !skipStore, 0); - return asyncOp(tx, op, opCtx); + return asyncOp(tx, op, opCtx, /*retry*/false); } else { AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx); - fut.execute(); + fut.execute(/*retry*/false); return fut; } } else - return asyncOp(tx, op, opCtx); + return asyncOp(tx, op, opCtx, /*retry*/false); } /** @@ -4164,9 +4164,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V protected <T> IgniteInternalFuture<T> asyncOp( GridNearTxLocal tx, final AsyncOp<T> op, - final CacheOperationContext opCtx + final CacheOperationContext opCtx, + final boolean retry ) { - IgniteInternalFuture<T> fail = asyncOpAcquire(); + IgniteInternalFuture<T> fail = asyncOpAcquire(retry); if (fail != null) return fail; @@ -4209,7 +4210,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - saveFuture(holder, f); + saveFuture(holder, f, retry); return f; } @@ -4233,7 +4234,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - saveFuture(holder, f); + saveFuture(holder, f, retry); if (tx.implicit()) ctx.tm().resetContext(); @@ -4252,7 +4253,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param holder Future holder. * @param fut Future to save. */ - protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut) { + protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean retry) { assert holder != null; assert fut != null; assert holder.holdsLock(); @@ -4262,12 +4263,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (fut.isDone()) { holder.future(null); - asyncOpRelease(); + asyncOpRelease(retry); } else { fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { - asyncOpRelease(); + asyncOpRelease(retry); if (!holder.tryLock()) return; @@ -4289,9 +4290,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * * @return Failed future if waiting was interrupted. */ - @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() { + @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean retry) { try { - if (asyncOpsSem != null) + if (!retry && asyncOpsSem != null) asyncOpsSem.acquire(); return null; @@ -4307,8 +4308,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Releases asynchronous operations permit, if limited. */ - private void asyncOpRelease() { - if (asyncOpsSem != null) + private void asyncOpRelease(boolean retry) { + if (!retry && asyncOpsSem != null) asyncOpsSem.release(); } @@ -4775,7 +4776,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * */ - public void execute() { + public void execute(boolean retry) { tx = ctx.tm().newTx( true, op.single(), @@ -4786,7 +4787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V opCtx == null || !opCtx.skipStore(), 0); - IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx); + IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry); fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() { @Override public void apply(IgniteInternalFuture<T> fut) { @@ -4816,7 +4817,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { topFut.get(); - execute(); + execute(/*retry*/true); } catch (IgniteCheckedException e) { onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 87a5536..2dacd12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -817,7 +817,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { - IgniteInternalFuture<T> fail = asyncOpAcquire(); + IgniteInternalFuture<T> fail = asyncOpAcquire(/*retry*/false); if (fail != null) return fail; @@ -841,14 +841,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); return f; } IgniteInternalFuture<T> f = op.apply(); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); return f; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f922d09..2292cb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTran import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@ -72,7 +71,6 @@ import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -241,7 +239,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } }); } - }, opCtx); + }, opCtx, /*retry*/false); } AffinityTopologyVersion topVer = tx == null ? @@ -308,7 +306,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte recovery, needVer); } - }, opCtx); + }, opCtx, /*retry*/false); } AffinityTopologyVersion topVer = tx == null ? http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 1468e8a..cc90be0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -152,7 +152,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> recovery, needVer); } - }, opCtx); + }, opCtx, /*retry*/false); } subjId = ctx.subjectIdPerCall(subjId, opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index b8c0e36..dfbc5af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1464,7 +1464,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { */ @SuppressWarnings("unchecked") private IgniteInternalFuture asyncOp(final Callable<?> op) { - IgniteInternalFuture fail = asyncOpAcquire(); + IgniteInternalFuture fail = asyncOpAcquire(/*retry*/false); if (fail != null) return fail; @@ -1484,14 +1484,14 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { } }); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); return f; } IgniteInternalFuture f = ctx.closures().callLocalSafe(op); - saveFuture(holder, f); + saveFuture(holder, f, /*retry*/false); return f; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b2fb9be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java index 3c241b8..10d75ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java @@ -119,7 +119,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract for (TestCommunicationSpi commSpi : commSpis) cnt += commSpi.messagesCount(); - assert cnt == ADDITION_CACHE_NUMBER; + assertEquals(ADDITION_CACHE_NUMBER, cnt); } finally { stopAllGrids();
