Repository: ignite Updated Branches: refs/heads/ignite-slow-rebal 79bc2eb07 -> 663a2d993
debugging rebalance Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/663a2d99 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/663a2d99 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/663a2d99 Branch: refs/heads/ignite-slow-rebal Commit: 663a2d993b9750b34eb8eef89cebf5cea4db8c6d Parents: 79bc2eb Author: Yakov Zhdanov <[email protected]> Authored: Wed Sep 30 19:21:58 2015 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Wed Sep 30 19:21:58 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 19 +- .../GridDhtPartitionDemandMessage.java | 3 +- .../GridDhtPartitionSupplyMessage.java | 3 +- .../GridDhtPartitionsExchangeFuture.java | 10 +- .../preloader/GridDhtPartitionsFullMessage.java | 11 +- .../communication/tcp/TcpCommunicationSpi.java | 22 ++- .../dht/GridCacheDhtPreloadPerformanceTest.java | 176 +++++++++++++++++++ 7 files changed, 222 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 421ec82..300a0e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -923,9 +924,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ", old=" + clsHandlers.get(key) + ", new=" + c + ']'; } - if (log != null && log.isDebugEnabled()) - log.debug("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + - ", msgIdx=" + msgIdx + ", handler=" + c + ']'); + IgniteLogger log0 = log; + + if (log0 != null && log0.isTraceEnabled()) + log0.trace( + "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + + ", msgIdx=" + msgIdx + ", handler=" + c + ']'); } /** @@ -978,15 +982,16 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings({"unchecked"}) public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + IgniteLogger log0 = log; if (orderedHandlers.putIfAbsent(topic, c) == null) { cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( (IgniteBiInClosure<UUID, GridCacheMessage>)c)); - if (log != null && log.isDebugEnabled()) - log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); + if (log0 != null && log0.isTraceEnabled()) + log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); } - else if (log != null) - U.warn(log, "Failed to register ordered cache communication handler because it is already " + + else if (log0 != null) + U.warn(log0, "Failed to register ordered cache communication handler because it is already " + "registered for this topic [topic=" + topic + ", handler=" + c + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index b588372..863ec8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -45,7 +45,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { private long updateSeq; /** Partition. */ - @GridToStringInclude @GridDirectCollection(int.class) private Collection<Integer> parts; @@ -330,4 +329,4 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 3ccc5ae..cf10a13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -404,7 +404,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G @Override public String toString() { return S.toString(GridDhtPartitionSupplyMessage.class, this, "size", size(), - "parts", infos.keySet(), "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a1b03c1..3cf0eb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -475,6 +475,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT busyLock.readLock().unlock(); } + // TODO remove + long inited; + /** * Starts activity. * @@ -488,6 +491,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone()) return; + inited = U.currentTimeMillis(); + try { // Wait for event to occur to make sure that discovery // will return corresponding nodes. @@ -800,7 +805,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); - + dumpedObjects++; } } @@ -1059,7 +1064,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (super.onDone(res, err) && !dummy && !forcePreload) { if (log.isDebugEnabled()) - log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']'); + log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + + "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - inited) + ']'); initFut.onDone(err == null); http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 50e2e41..5429538 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -43,7 +43,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ @GridToStringInclude @GridDirectTransient - private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>(); + private Map<Integer, GridDhtPartitionFullMap> parts; /** */ private byte[] partsBytes; @@ -68,7 +68,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); - assert parts != null; assert id == null || topVer.equals(id.topologyVersion()); this.topVer = topVer; @@ -86,6 +85,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param fullMap Full partitions map. */ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + if (parts == null) + parts = new HashMap<>(); + if (!parts.containsKey(cacheId)) parts.put(cacheId, fullMap); } @@ -95,7 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (partsBytes == null && parts != null) + if (parts != null && partsBytes == null) partsBytes = ctx.marshaller().marshal(parts); } @@ -117,7 +119,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null) + if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); } @@ -201,3 +203,4 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa "super", super.toString()); } } + http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c93d5af..5ea2c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2050,11 +2050,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (locNode == null) throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + if (log.isDebugEnabled()) + log.debug("Creating NIO client to node: " + node); + // If remote node has shared memory server enabled and has the same set of MACs // then we are likely to run on the same host and shared memory communication could be tried. if (shmemPort != null && U.sameMacs(locNode, node)) { try { - return createShmemClient(node, shmemPort); + GridCommunicationClient client = createShmemClient( + node, + shmemPort); + + if (log.isDebugEnabled()) + log.debug("Shmem client created: " + client); + + return client; } catch (IgniteCheckedException e) { if (e.hasCause(IpcOutOfSystemResourcesException.class)) @@ -2071,7 +2081,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - return createTcpClient(node); + GridCommunicationClient client = createTcpClient(node); + + if (log.isDebugEnabled()) + log.debug("TCP client created: " + client); + + return client; } finally { connectGate.leave(); @@ -2453,9 +2468,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw errs; } - if (log.isDebugEnabled()) - log.debug("Created client: " + client); - return client; } http://git-wip-us.apache.org/repos/asf/ignite/blob/663a2d99/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java new file mode 100644 index 0000000..c00f316 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + */ +public class GridCacheDhtPreloadPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final int THREAD_CNT = 2; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode( + CacheMode.PARTITIONED); + cc.setWriteSynchronizationMode( + CacheWriteSynchronizationMode.FULL_SYNC); + cc.setRebalanceMode( + CacheRebalanceMode.SYNC); + cc.setAffinity(new RendezvousAffinityFunction(false, 1300)); + cc.setBackups(2); + + CacheConfiguration cc1 = defaultCacheConfiguration(); + + cc1.setName("cc1"); + cc1.setCacheMode( + CacheMode.PARTITIONED); + cc1.setWriteSynchronizationMode( + CacheWriteSynchronizationMode.FULL_SYNC); + cc1.setRebalanceMode( + CacheRebalanceMode.SYNC); + cc1.setAffinity( + new RendezvousAffinityFunction( + false, + 1300)); + cc1.setBackups(2); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setIgfsThreadPoolSize(1); + c.setSystemThreadPoolSize(2); + c.setPublicThreadPoolSize(2); + c.setManagementThreadPoolSize(1); + c.setUtilityCachePoolSize(2); + c.setPeerClassLoadingThreadPoolSize(1); + + c.setCacheConfiguration(cc, cc1); + + TcpCommunicationSpi comm = new TcpCommunicationSpi(); + + comm.setSharedMemoryPort(-1); + + c.setCommunicationSpi(comm); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartPerformance() throws Exception { +// for (int i = 0; i < 10; i++) { +// try { +// startGrid(1); +// startGrid(2); +// startGrid(3); +// } +// finally { +// G.stopAll(true); +// } +// } + + multithreaded( + new Callable<Object>() { + @Override public Object call() throws Exception { + long start = U.currentTimeMillis(); + + Ignite grid = startGrid(Thread.currentThread().getName()); + + System.out.println(">>> Time to start: " + (U.currentTimeMillis() - start) + ", topSize=" + grid.cluster().nodes().size()); + + return null; + } + }, + THREAD_CNT); + } + + /** + * Communication SPI that will count single partition update messages. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Recorded messages. */ + private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + recordMessage((GridIoMessage)msg); + + super.sendMessage(node, msg, ackClosure); + } + + /** + * @return Collection of sent messages. + */ + public Collection<GridDhtPartitionsSingleMessage> sentMessages() { + return sentMsgs; + } + + /** + * Adds message to a list if message is of correct type. + * + * @param msg Message. + */ + private void recordMessage(GridIoMessage msg) { + if (msg.message() instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage partSingleMsg = (GridDhtPartitionsSingleMessage)msg.message(); + + sentMsgs.add(partSingleMsg); + } + } + } +}
