IGNITE-2466 - Disable back-pressure for sender data nodes to avoid deadlock.
(cherry picked from commit ba6227b) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb3ff120 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb3ff120 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb3ff120 Branch: refs/heads/ignite-4932 Commit: bb3ff120e6995431d10439243d8b163712de0e0e Parents: 220db88 Author: dkarachentsev <[email protected]> Authored: Mon Apr 10 11:40:17 2017 +0300 Committer: dkarachentsev <[email protected]> Committed: Mon Apr 10 11:41:29 2017 +0300 ---------------------------------------------------------------------- .../distributed/dht/atomic/GridDhtAtomicCache.java | 5 ++++- .../CacheAtomicPrimarySyncBackPressureTest.java | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb3ff120/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 047be87..4159359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1909,7 +1909,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.cleanup(!node.isLocal()); if (dhtFut != null) { - if (req.writeSynchronizationMode() == PRIMARY_SYNC && !dhtFut.isDone()) { + if (req.writeSynchronizationMode() == PRIMARY_SYNC + // To avoid deadlock disable back-pressure for sender data node. + && !ctx.discovery().cacheAffinityNode(ctx.discovery().node(nodeId), ctx.name()) + && !dhtFut.isDone()) { final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); if (tracker != null && tracker instanceof GridNioMessageTracker) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bb3ff120/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java index 49e3e5c..30436f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -53,6 +54,8 @@ public class CacheAtomicPrimarySyncBackPressureTest extends GridCommonAbstractTe ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED); ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setNodeFilter(new NodeFilter()); + TestCommunicationSpi spi = new TestCommunicationSpi(); spi.setMessageQueueLimit(100); @@ -148,4 +151,17 @@ public class CacheAtomicPrimarySyncBackPressureTest extends GridCommonAbstractTe throw new IgniteSpiException(e); } } + + /** + * Filters out server node producer. + */ + private static class NodeFilter implements IgnitePredicate<ClusterNode> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return !("server3".equals(node.attribute("org.apache.ignite.ignite.name"))); + } + } }
