Repository: ignite Updated Branches: refs/heads/ignite-4680-sb 27b230941 -> fa97cedb9
tmp Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fa97cedb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fa97cedb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fa97cedb Branch: refs/heads/ignite-4680-sb Commit: fa97cedb960e11839a18d80a63287e0d9c4b47a7 Parents: 27b2309 Author: sboikov <sboi...@gridgain.com> Authored: Mon Mar 20 17:05:34 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Mar 20 17:05:34 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 77 +++++++++++++++----- .../ignite/internal/util/StripedExecutor.java | 8 +- 2 files changed, 64 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fa97cedb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 516a8a2..feed87f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -34,6 +34,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -122,6 +123,8 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; @SuppressWarnings("unchecked") @GridToStringExclude public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { + private static final boolean TEST_STRIPE_SUBMIT = IgniteSystemProperties.getBoolean("TEST_STRIPE_SUBMIT"); + /** */ private static final long serialVersionUID = 0L; @@ -420,6 +423,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); } + + log.info("Start cache, TEST_STRIPE_SUBMIT: " + TEST_STRIPE_SUBMIT); } /** @@ -1804,22 +1809,38 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final AffinityAssignment affAssignment = ctx.affinity().assignment(req.topologyVersion()); - ((GridNearAtomicFullUpdateRequest)req).responseHelper(new NearAtomicResponseHelper(stripemap.size())); - - for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { - if (stripeIdx == e.getKey()) - update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); - else { - ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() { - @Override public void run() { - try { - update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); + if (TEST_STRIPE_SUBMIT) { + for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { + if (stripeIdx == e.getKey()) + continue; + else { + ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() { + @Override public void run() { } - catch (Exception e) { - e.printStackTrace(); + }); + } + } + + update(affAssignment, ver, fut, node, req, null, completionCb); + } + else { + req.responseHelper(new NearAtomicResponseHelper(stripemap.size())); + + for (final Map.Entry<Integer, int[]> e : stripemap.entrySet()) { + if (stripeIdx == e.getKey()) + update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); + else { + ctx.kernalContext().getStripedExecutorService().execute(e.getKey(), new Runnable() { + @Override public void run() { + try { + update(affAssignment, ver, fut, node, req, e.getValue(), completionCb); + } + catch (Exception e) { + e.printStackTrace(); + } } - } - }); + }); + } } } } @@ -1834,7 +1855,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { top.readUnlock(); } } - catch (GridCacheEntryRemovedException e) { + catch (Exception e) { assert false : "Entry should not become obsolete while holding lock."; e.printStackTrace(); @@ -1957,9 +1978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { unlockEntries(locked, null); - GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res); - - if (res0 != null) { + if (TEST_STRIPE_SUBMIT){ for (int i = 0; i < req.size(); i++) { fut.addWriteEntry(affinityAssignment, req.key(i), @@ -1977,6 +1996,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); } + else { + GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res); + + if (res0 != null) { + for (int i = 0; i < req.size(); i++) { + fut.addWriteEntry(affinityAssignment, + req.key(i), + req.value(i), + null, + 0, + 0, + null, + false, + null, + 1L); + } + + fut.onDone(); + + completionCb.apply(req, res); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fa97cedb/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index d4e16e8..ed5f1dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -76,7 +76,7 @@ public class StripedExecutor implements ExecutorService { try { for (int i = 0; i < cnt; i++) { - stripes[i] = new StripeMPSCQueue( + stripes[i] = new StripeConcurrentQueue( igniteInstanceName, poolName, i, @@ -428,7 +428,7 @@ public class StripedExecutor implements ExecutorService { private final IgniteLogger log; /** Stopping flag. */ - private volatile boolean stopping; + protected volatile boolean stopping; /** */ private volatile long completedCnt; @@ -627,8 +627,10 @@ public class StripedExecutor implements ExecutorService { //parkCntr++; LockSupport.park(); - if (Thread.interrupted()) + if (stopping) throw new InterruptedException(); +// if (Thread.interrupted()) +// throw new InterruptedException(); } } finally {