Repository: ignite Updated Branches: refs/heads/ignite-1534 a76d89132 -> 0bd222d28
ignite-1534 debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3624073b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3624073b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3624073b Branch: refs/heads/ignite-1534 Commit: 3624073b755dcb8d1cd19f087f11e7b888499508 Parents: 7c7994d Author: sboikov <[email protected]> Authored: Thu Oct 1 09:19:20 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 1 09:19:20 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 67 ++------------------ .../testsuites/IgniteCacheTestSuite4.java | 2 + 2 files changed, 8 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3624073b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 8fa8be4..7c5c4fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -106,18 +106,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Partition resend timeout after eviction. */ private final long partResendTimeout = getLong(IGNITE_PRELOAD_RESEND_TIMEOUT, DFLT_PRELOAD_RESEND_TIMEOUT); - /** Latch which completes after local exchange future is created. */ - private GridFutureAdapter<?> locExchFut; - /** */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); /** Last partition refresh. */ private final AtomicLong lastRefresh = new AtomicLong(-1); - /** Pending futures. */ - private final Queue<GridDhtPartitionsExchangeFuture> pendingExchangeFuts = new ConcurrentLinkedQueue<>(); - /** */ @GridToStringInclude private ExchangeWorker exchWorker; @@ -232,31 +226,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (exchId != null) { GridDebug.debug("add pending exchange", exchId, evt); - // Start exchange process. - pendingExchangeFuts.add(exchFut); - - // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); - if (log.isDebugEnabled()) log.debug("Discovery event (will start exchange): " + exchId); - locExchFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - if (!enterBusy()) - return; + // Event callback - without this callback future will never complete. + exchFut.onEvent(exchId, e); - try { - // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) - addFuture(f); - } - finally { - leaveBusy(); - } - } - }); + // Start exchange process. + addFuture(exchFut); } else GridDebug.debug("skip exchange", evt); @@ -271,8 +248,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Override protected void start0() throws IgniteCheckedException { super.start0(); - locExchFut = new GridFutureAdapter<>(); - exchWorker = new ExchangeWorker(); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, @@ -333,12 +308,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); - new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); - - onDiscoveryEvent(cctx.localNodeId(), fut); + exchWorker.futQ.addFirst(fut); - // Allow discovery events to get processed. - locExchFut.onDone(); + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); if (reconnect) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @@ -419,12 +391,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (AffinityReadyFuture f : readyFuts.values()) f.onDone(stopErr); - for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts) - f.onDone(stopErr); - - if (locExchFut != null) - locExchFut.onDone(stopErr); - U.cancel(exchWorker); if (log.isDebugEnabled()) @@ -588,22 +554,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param nodeId New node ID. - * @param fut Exchange future. - */ - void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture fut) { - if (!enterBusy()) - return; - - try { - addFuture(fut); - } - finally { - leaveBusy(); - } - } - - /** * @param evt Discovery event. * @return Affinity topology version. */ @@ -1036,11 +986,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana U.warn(log, "Last exchange future: " + lastInitializedFut); - U.warn(log, "Pending exchange futures:"); - - for (GridDhtPartitionsExchangeFuture fut : pendingExchangeFuts) - U.warn(log, ">>> " + fut); - ExchangeFutureSet exchFuts = this.exchFuts; if (exchFuts != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3624073b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index afdd65a..3551cd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -142,11 +142,13 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest2.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest2.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest2.class); + suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest2.class); suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(CacheAffinityEarlyTest.class); + suite.addTestSuite(CacheAffinityEarlyTest.class); return suite; }
