1093
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0f7c32c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0f7c32c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0f7c32c Branch: refs/heads/ignite-1093-2 Commit: f0f7c32caa1a9566c77d0a66bb533a4a07a2338a Parents: 9abfc60 Author: Anton Vinogradov <[email protected]> Authored: Tue Sep 29 18:20:01 2015 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Tue Sep 29 18:20:01 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- .../dht/preloader/GridDhtPartitionSupplier.java | 196 +++++++++++++------ .../GridCacheRebalancingSyncSelfTest.java | 30 ++- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 17 -- .../spi/discovery/tcp/TestTcpDiscoverySpi.java | 46 +++++ .../testframework/junits/GridAbstractTest.java | 3 +- 6 files changed, 194 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 5d4db40..d1d475c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -818,7 +818,7 @@ public class GridDhtPartitionDemander { @Override public void apply(IgniteInternalFuture<Long> future) { SyncFuture.this.cancel(); } - }); + }); // todo: is it necessary? } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 81e2fa4..b5bb25d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -24,7 +24,6 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -41,15 +40,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.T4; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** @@ -71,8 +67,8 @@ class GridDhtPartitionSupplier { /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; - /** Supply context map. T4: nodeId, idx, topologyVersion, updateSequence. */ - private final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> scMap = + /** Supply context map. T2: nodeId, idx. */ + private final ConcurrentHashMap8<T2<UUID, Integer>, SupplyContext> scMap = new ConcurrentHashMap8<>(); /** Rebalancing listener. */ @@ -100,26 +96,18 @@ class GridDhtPartitionSupplier { lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { if (evt instanceof DiscoveryEvent) { - for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) { - T4<UUID, Integer, AffinityTopologyVersion, Long> t = entry.getKey(); + for (Map.Entry<T2<UUID, Integer>, SupplyContext> entry : scMap.entrySet()) { + T2<UUID, Integer> t = entry.getKey(); - SupplyContext sc = entry.getValue(); + if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) { + SupplyContext sctx = entry.getValue(); - if (t.get3() != null && !t.get3().equals(cctx.affinity().affinityTopologyVersion()) && sc != null) - clearContext(scMap, t, sc, log); - } - } - else if (evt instanceof CacheRebalancingEvent) { - CacheRebalancingEvent e = (CacheRebalancingEvent)evt; + clearContext(sctx, log); - if (cctx.name().equals(e.cacheName())) { - UUID id = e.discoveryNode().id(); + U.log(log, "Supply context removed for failed node [node=" + t.get1() + "]"); - for (Map.Entry<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> entry : scMap.entrySet()) { - if (id.equals(entry.getKey().get1())) - clearContext(scMap, entry.getKey(), entry.getValue(), log); + scMap.remove(t, sctx); } - } } else { @@ -128,7 +116,7 @@ class GridDhtPartitionSupplier { } }; - cctx.events().addListener(lsnr, EVT_NODE_FAILED, EVT_CACHE_REBALANCE_STOPPED); + cctx.events().addListener(lsnr, EVT_NODE_FAILED); startOldListeners(); } @@ -145,32 +133,38 @@ class GridDhtPartitionSupplier { /** * Clear context. * - * @param map Context map. - * @param t id. * @param sc Supply context. * @param log Logger. * @return true in case context was removed. */ - private static boolean clearContext( - final ConcurrentHashMap8<T4<UUID, Integer, AffinityTopologyVersion, Long>, SupplyContext> map, - final T4<UUID, Integer, AffinityTopologyVersion, Long> t, + private static void clearContext( final SupplyContext sc, final IgniteLogger log) { - final Iterator it = sc.entryIt; + if (sc != null) { + final Iterator it = sc.entryIt; - if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { - try { - synchronized (it) { - if (!((GridCloseableIterator)it).isClosed()) - ((GridCloseableIterator)it).close(); + if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { + try { + synchronized (it) { + if (!((GridCloseableIterator)it).isClosed()) + ((GridCloseableIterator)it).close(); + } + } + catch (IgniteCheckedException e) { + log.error("Iterator close failed.", e); } } - catch (IgniteCheckedException e) { - log.error("Iterator close failed.", e); + + final GridDhtLocalPartition loc = sc.loc; + + if (loc != null && loc.reservations() > 0) { + synchronized (loc) { + if (loc.reservations() > 0) + loc.release(); + } + } } - - return map.remove(t, sc); } /** @@ -199,10 +193,16 @@ class GridDhtPartitionSupplier { ClusterNode node = cctx.discovery().node(id); - T4<UUID, Integer, AffinityTopologyVersion, Long> scId = new T4<>(id, idx, d.topologyVersion(), d.updateSequence()); + T2<UUID, Integer> scId = new T2<>(id, idx); try { - SupplyContext sctx = scMap.get(scId); + SupplyContext sctx = scMap.remove(scId); + + if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) { + clearContext(sctx, log); + + sctx = null; + } if (sctx == null && d.partitions() == null) return; @@ -230,18 +230,27 @@ class GridDhtPartitionSupplier { newReq = false; - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + GridDhtLocalPartition loc; - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); + if (sctx != null && sctx.loc != null) { + loc = sctx.loc; - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + id + ']'); + assert loc.reservations() > 0; + } + else { + loc = top.localPartition(part, d.topologyVersion(), false); - continue; + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } } GridCacheEntryInfoCollectSwapListener swapLsnr = null; @@ -279,9 +288,18 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr); + saveSupplyContext(scId, + phase, + partIt, + part, + entIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); swapLsnr = null; + loc = null; reply(node, d, s); @@ -323,7 +341,10 @@ class GridDhtPartitionSupplier { partIt, null, swapLsnr, - part); + part, + loc, + d.topologyVersion(), + d.updateSequence()); } } @@ -354,9 +375,18 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr); + saveSupplyContext(scId, + phase, + partIt, + part, + iter, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); swapLsnr = null; + loc = null; reply(node, d, s); @@ -437,7 +467,10 @@ class GridDhtPartitionSupplier { partIt, null, null, - part); + part, + loc, + d.topologyVersion(), + d.updateSequence()); } } @@ -465,9 +498,17 @@ class GridDhtPartitionSupplier { if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr); - - swapLsnr = null; + saveSupplyContext(scId, + phase, + partIt, + part, + lsnrIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + loc = null; reply(node, d, s); @@ -500,7 +541,8 @@ class GridDhtPartitionSupplier { sctx = null; } finally { - loc.release(); + if (loc != null) + loc.release(); if (swapLsnr != null) { cctx.swap().removeOffHeapListener(part, swapLsnr); @@ -561,12 +603,24 @@ class GridDhtPartitionSupplier { * @param swapLsnr Swap listener. */ private void saveSupplyContext( - T4 t, + T2<UUID, Integer> t, int phase, Iterator<Integer> partIt, int part, - Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr) { - scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, + GridDhtLocalPartition loc, + AffinityTopologyVersion topVer, + long updateSeq) { + SupplyContext old = scMap.putIfAbsent(t, new SupplyContext(phase, + partIt, + entryIt, + swapLsnr, + part, + loc, + topVer, + updateSeq)); + + assert old == null; } /** @@ -588,6 +642,15 @@ class GridDhtPartitionSupplier { /** Partition. */ private final int part; + /** Local partition. */ + GridDhtLocalPartition loc; + + /** Topology version. */ + AffinityTopologyVersion topVer; + + /** Update seq. */ + long updateSeq; + /** * @param phase Phase. * @param partIt Partition iterator. @@ -595,13 +658,22 @@ class GridDhtPartitionSupplier { * @param swapLsnr Swap listener. * @param part Partition. */ - public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt, - GridCacheEntryInfoCollectSwapListener swapLsnr, int part) { + public SupplyContext(int phase, + Iterator<Integer> partIt, + Iterator<?> entryIt, + GridCacheEntryInfoCollectSwapListener swapLsnr, + int part, + GridDhtLocalPartition loc, + AffinityTopologyVersion topVer, + long updateSeq) { this.phase = phase; this.partIt = partIt; this.entryIt = entryIt; this.swapLsnr = swapLsnr; this.part = part; + this.loc = loc; + this.topVer = topVer; + this.updateSeq = updateSeq; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java index be8e24b..bb40f31 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -59,18 +60,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2"; /** */ - private volatile boolean concurrentStartFinished = false; + private volatile boolean concurrentStartFinished; /** */ - private volatile boolean concurrentStartFinished2 = false; - - private volatile FailableTcpDiscoverySpi spi; - - public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi { - public void fail() { - simulateNodeFailure(); - } - } + private volatile boolean concurrentStartFinished2; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -78,11 +71,6 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { iCfg.setRebalanceThreadPoolSize(4); - iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); - - if (getTestGridName(20).equals(gridName)) - spi = (FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); @@ -96,7 +84,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC); cachePCfg.setBackups(1); cachePCfg.setRebalanceBatchSize(1); - cachePCfg.setRebalanceBatchesCount(1); + //cachePCfg.setRebalanceBatchesCount(1); + cachePCfg.setRebalanceBatchesCount(Integer.MAX_VALUE); CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>(); @@ -285,6 +274,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { long start = System.currentTimeMillis(); + concurrentStartFinished = false; + concurrentStartFinished2 = false; + Thread t1 = new Thread() { @Override public void run() { try { @@ -409,11 +401,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest { waitForRebalancing(1, 2); - startGrid(20); + startGrid(2); - waitForRebalancing(20, 3); + waitForRebalancing(2, 3); - spi.fail(); + ((TestTcpDiscoverySpi)grid(2).configuration().getDiscoverySpi()).simulateNodeFailure(); waitForRebalancing(0, 4); waitForRebalancing(1, 4); http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0280e9c..51d8a2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** - * - */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { - /** */ - private boolean ignorePingResponse; - - /** {@inheritDoc} */ - protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, - IgniteCheckedException { - if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) - return; - else - super.writeToSocket(sock, msg, timeout); - } - } - - /** * @throws Exception If any error occurs. */ public void testNodeAdded() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java new file mode 100644 index 0000000..dbc54bc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.net.Socket; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; + +/** + * + */ +public class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + public boolean ignorePingResponse; + + /** {@inheritDoc} */ + protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + IgniteCheckedException { + if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) + return; + else + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override public void simulateNodeFailure() { + super.simulateNodeFailure(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f7c32c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index f54fe06..546549b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -76,6 +76,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -1227,7 +1228,7 @@ public abstract class GridAbstractTest extends TestCase { cfg.setCommunicationSpi(commSpi); - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); if (isDebug()) { discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);
