ignite-4844 Removed internal async ops queue for atomic cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/130b1fde Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/130b1fde Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/130b1fde Branch: refs/heads/ignite-4535 Commit: 130b1fde3e1f9cf1f9685a8144d71d03c7514533 Parents: f923bc9 Author: Konstantin Dudkov <kdud...@ya.ru> Authored: Wed Apr 19 18:34:08 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Apr 19 18:34:08 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 5 ++- .../dht/atomic/GridDhtAtomicCache.java | 38 ++++---------------- .../local/atomic/GridLocalAtomicCache.java | 35 ++++-------------- 3 files changed, 17 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b38e481..a3d4c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4288,6 +4288,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Tries to acquire asynchronous operations permit, if limited. * + * @param retry Retry flag. * @return Failed future if waiting was interrupted. */ @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean retry) { @@ -4307,8 +4308,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Releases asynchronous operations permit, if limited. + * + * @param retry Retry flag. */ - private void asyncOpRelease(boolean retry) { + protected final void asyncOpRelease(boolean retry) { if (!retry && asyncOpsSem != null) asyncOpsSem.release(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2dacd12..5bbfe14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -84,7 +84,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioBackPressureControl; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; @@ -102,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; @@ -822,39 +820,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (fail != null) return fail; - FutureHolder holder = lastFut.get(); + IgniteInternalFuture<T> f = op.apply(); - holder.lock(); - - try { - IgniteInternalFuture fut = holder.future(); - - if (fut != null && !fut.isDone()) { - IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, - new IgniteOutClosure<IgniteInternalFuture>() { - @Override public IgniteInternalFuture<T> apply() { - if (ctx.kernalContext().isStopping()) - return new GridFinishedFuture<>( - new IgniteCheckedException("Operation has been cancelled (node is stopping).")); - - return op.apply(); - } - }); - - saveFuture(holder, f, /*retry*/false); - - return f; + f.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + asyncOpRelease(/*retry*/false); } + }); - IgniteInternalFuture<T> f = op.apply(); - - saveFuture(holder, f, /*retry*/false); - - return f; - } - finally { - holder.unlock(); - } + return f; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index dfbc5af..e1d4484 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -61,11 +61,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.resource.GridResourceIoc; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.C2; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -1469,35 +1468,15 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (fail != null) return fail; - FutureHolder holder = lastFut.get(); + IgniteInternalFuture f = ctx.closures().callLocalSafe(op); - holder.lock(); - - try { - IgniteInternalFuture fut = holder.future(); - - if (fut != null && !fut.isDone()) { - IgniteInternalFuture f = new GridEmbeddedFuture(fut, - new C2<Object, Exception, IgniteInternalFuture>() { - @Override public IgniteInternalFuture apply(Object t, Exception e) { - return ctx.closures().callLocalSafe(op); - } - }); - - saveFuture(holder, f, /*retry*/false); - - return f; + f.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + asyncOpRelease(false); } + }); - IgniteInternalFuture f = ctx.closures().callLocalSafe(op); - - saveFuture(holder, f, /*retry*/false); - - return f; - } - finally { - holder.unlock(); - } + return f; } /** {@inheritDoc} */