ignite-4844 Removed internal async ops queue for atomic cache

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/130b1fde
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/130b1fde
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/130b1fde

Branch: refs/heads/ignite-4535
Commit: 130b1fde3e1f9cf1f9685a8144d71d03c7514533
Parents: f923bc9
Author: Konstantin Dudkov <kdud...@ya.ru>
Authored: Wed Apr 19 18:34:08 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Apr 19 18:34:08 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  5 ++-
 .../dht/atomic/GridDhtAtomicCache.java          | 38 ++++----------------
 .../local/atomic/GridLocalAtomicCache.java      | 35 ++++--------------
 3 files changed, 17 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b38e481..a3d4c81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4288,6 +4288,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      * Tries to acquire asynchronous operations permit, if limited.
      *
+     * @param retry Retry flag.
      * @return Failed future if waiting was interrupted.
      */
     @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean 
retry) {
@@ -4307,8 +4308,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
     /**
      * Releases asynchronous operations permit, if limited.
+     *
+     * @param retry Retry flag.
      */
-    private void asyncOpRelease(boolean retry) {
+    protected final void asyncOpRelease(boolean retry) {
         if (!retry && asyncOpsSem != null)
             asyncOpsSem.release();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2dacd12..5bbfe14 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -84,7 +84,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioBackPressureControl;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
@@ -102,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -822,39 +820,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (fail != null)
             return fail;
 
-        FutureHolder holder = lastFut.get();
+        IgniteInternalFuture<T> f = op.apply();
 
-        holder.lock();
-
-        try {
-            IgniteInternalFuture fut = holder.future();
-
-            if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
-                    new IgniteOutClosure<IgniteInternalFuture>() {
-                        @Override public IgniteInternalFuture<T> apply() {
-                            if (ctx.kernalContext().isStopping())
-                                return new GridFinishedFuture<>(
-                                    new IgniteCheckedException("Operation has 
been cancelled (node is stopping)."));
-
-                            return op.apply();
-                        }
-                    });
-
-                saveFuture(holder, f, /*retry*/false);
-
-                return f;
+        f.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                asyncOpRelease(/*retry*/false);
             }
+        });
 
-            IgniteInternalFuture<T> f = op.apply();
-
-            saveFuture(holder, f, /*retry*/false);
-
-            return f;
-        }
-        finally {
-            holder.unlock();
-        }
+        return f;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/130b1fde/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index dfbc5af..e1d4484 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -61,11 +61,10 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.resource.GridResourceIoc;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -1469,35 +1468,15 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (fail != null)
             return fail;
 
-        FutureHolder holder = lastFut.get();
+        IgniteInternalFuture f = ctx.closures().callLocalSafe(op);
 
-        holder.lock();
-
-        try {
-            IgniteInternalFuture fut = holder.future();
-
-            if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture f = new GridEmbeddedFuture(fut,
-                    new C2<Object, Exception, IgniteInternalFuture>() {
-                        @Override public IgniteInternalFuture apply(Object t, 
Exception e) {
-                            return ctx.closures().callLocalSafe(op);
-                        }
-                    });
-
-                saveFuture(holder, f, /*retry*/false);
-
-                return f;
+        f.listen(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                asyncOpRelease(false);
             }
+        });
 
-            IgniteInternalFuture f = ctx.closures().callLocalSafe(op);
-
-            saveFuture(holder, f, /*retry*/false);
-
-            return f;
-        }
-        finally {
-            holder.unlock();
-        }
+        return f;
     }
 
     /** {@inheritDoc} */

Reply via email to