IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails: prevent 'localUpdate' execution while top read lock is held.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b83ec8e5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b83ec8e5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b83ec8e5 Branch: refs/heads/master Commit: b83ec8e57c7c48f2baa4780cf3b2e46df075f3df Parents: bc977d3 Author: sboikov <[email protected]> Authored: Fri Dec 9 14:32:42 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 9 14:32:42 2016 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamProcessor.java | 22 +++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b83ec8e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 32fda87..fee4dd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (!allowOverwrite) cctx.topology().readLock(); + GridDhtTopologyFuture topWaitFut = null; + try { GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); @@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer); } - else { - fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { - localUpdate(nodeId, req, updater, topic); - } - }); - } + else + topWaitFut = fut; } finally { if (!allowOverwrite) cctx.topology().readUnlock(); } + if (topWaitFut != null) { + // Need call 'listen' after topology read lock is released. + topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { + localUpdate(nodeId, req, updater, topic); + } + }); + + return; + } + if (job != null) { try { job.call();
