Repository: ignite
Updated Branches:
  refs/heads/ignite-4680-sb 27b230941 -> fa97cedb9


tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fa97cedb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fa97cedb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fa97cedb

Branch: refs/heads/ignite-4680-sb
Commit: fa97cedb960e11839a18d80a63287e0d9c4b47a7
Parents: 27b2309
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Mar 20 17:05:34 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Mar 20 17:05:34 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 77 +++++++++++++++-----
 .../ignite/internal/util/StripedExecutor.java   |  8 +-
 2 files changed, 64 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fa97cedb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 516a8a2..feed87f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -122,6 +123,8 @@ import static 
org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
 @SuppressWarnings("unchecked")
 @GridToStringExclude
 public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
+    private static final boolean TEST_STRIPE_SUBMIT = 
IgniteSystemProperties.getBoolean("TEST_STRIPE_SUBMIT");
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -420,6 +423,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     }
                 });
         }
+
+        log.info("Start cache, TEST_STRIPE_SUBMIT: " + TEST_STRIPE_SUBMIT);
     }
 
     /**
@@ -1804,22 +1809,38 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                         final AffinityAssignment affAssignment = 
ctx.affinity().assignment(req.topologyVersion());
 
-                        
((GridNearAtomicFullUpdateRequest)req).responseHelper(new 
NearAtomicResponseHelper(stripemap.size()));
-
-                        for (final Map.Entry<Integer, int[]> e : 
stripemap.entrySet()) {
-                            if (stripeIdx == e.getKey())
-                                update(affAssignment, ver, fut, node, req, 
e.getValue(), completionCb);
-                            else {
-                                
ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new 
Runnable() {
-                                    @Override public void run() {
-                                        try {
-                                            update(affAssignment, ver, fut, 
node, req, e.getValue(), completionCb);
+                        if (TEST_STRIPE_SUBMIT) {
+                            for (final Map.Entry<Integer, int[]> e : 
stripemap.entrySet()) {
+                                if (stripeIdx == e.getKey())
+                                    continue;
+                                else {
+                                    
ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new 
Runnable() {
+                                        @Override public void run() {
                                         }
-                                        catch (Exception e) {
-                                            e.printStackTrace();
+                                    });
+                                }
+                            }
+
+                            update(affAssignment, ver, fut, node, req, null, 
completionCb);
+                        }
+                        else {
+                            req.responseHelper(new 
NearAtomicResponseHelper(stripemap.size()));
+
+                            for (final Map.Entry<Integer, int[]> e : 
stripemap.entrySet()) {
+                                if (stripeIdx == e.getKey())
+                                    update(affAssignment, ver, fut, node, req, 
e.getValue(), completionCb);
+                                else {
+                                    
ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new 
Runnable() {
+                                        @Override public void run() {
+                                            try {
+                                                update(affAssignment, ver, 
fut, node, req, e.getValue(), completionCb);
+                                            }
+                                            catch (Exception e) {
+                                                e.printStackTrace();
+                                            }
                                         }
-                                    }
-                                });
+                                    });
+                                }
                             }
                         }
                     }
@@ -1834,7 +1855,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     top.readUnlock();
                 }
             }
-            catch (GridCacheEntryRemovedException e) {
+            catch (Exception e) {
                 assert false : "Entry should not become obsolete while holding 
lock.";
 
                 e.printStackTrace();
@@ -1957,9 +1978,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         unlockEntries(locked, null);
 
-        GridNearAtomicUpdateResponse res0 = 
req.responseHelper().addResponse(res);
-
-        if (res0 != null) {
+        if (TEST_STRIPE_SUBMIT){
             for (int i = 0; i < req.size(); i++) {
                 fut.addWriteEntry(affinityAssignment,
                     req.key(i),
@@ -1977,6 +1996,28 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
             completionCb.apply(req, res);
         }
+        else {
+            GridNearAtomicUpdateResponse res0 = 
req.responseHelper().addResponse(res);
+
+            if (res0 != null) {
+                for (int i = 0; i < req.size(); i++) {
+                    fut.addWriteEntry(affinityAssignment,
+                        req.key(i),
+                        req.value(i),
+                        null,
+                        0,
+                        0,
+                        null,
+                        false,
+                        null,
+                        1L);
+                }
+
+                fut.onDone();
+
+                completionCb.apply(req, res);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fa97cedb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index d4e16e8..ed5f1dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -76,7 +76,7 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeMPSCQueue(
+                stripes[i] = new StripeConcurrentQueue(
                     igniteInstanceName,
                     poolName,
                     i,
@@ -428,7 +428,7 @@ public class StripedExecutor implements ExecutorService {
         private final IgniteLogger log;
 
         /** Stopping flag. */
-        private volatile boolean stopping;
+        protected volatile boolean stopping;
 
         /** */
         private volatile long completedCnt;
@@ -627,8 +627,10 @@ public class StripedExecutor implements ExecutorService {
                     //parkCntr++;
                     LockSupport.park();
 
-                    if (Thread.interrupted())
+                    if (stopping)
                         throw new InterruptedException();
+//                    if (Thread.interrupted())
+//                        throw new InterruptedException();
                 }
             }
             finally {

Reply via email to