Repository: ignite Updated Branches: refs/heads/ignite-1.4.2 6e3d1b11b -> 4f95be256
Minor fixes + some debug statements in scope of slow rebalancing investigation. Squashed commit of the following: commit ee4e65088114fd1e2dfa229f0853551c1ecb5e1d Author: Alexey Goncharuk <[email protected]> Date: Thu Oct 1 14:45:36 2015 +0300 debug commit 4d61a0f0c95b16f701993e5a3131ebdcf1beb67c Author: Alexey Goncharuk <[email protected]> Date: Thu Oct 1 13:07:40 2015 +0300 debug commit 663a2d993b9750b34eb8eef89cebf5cea4db8c6d Author: Yakov Zhdanov <[email protected]> Date: Wed Sep 30 19:21:58 2015 +0300 debugging rebalance commit 79bc2eb077cb3a8175c25864f2cebdc074372057 Author: Denis Magda <[email protected]> Date: Wed Sep 30 12:33:54 2015 +0300 slow rebalancing commit a950de96624dcef64b6e8935c4f82ef7629035b3 Author: Denis Magda <[email protected]> Date: Wed Sep 30 12:30:47 2015 +0300 slow rebalancing Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f95be25 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f95be25 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f95be25 Branch: refs/heads/ignite-1.4.2 Commit: 4f95be256fc49d3d2d58076adb3e2d213822e24c Parents: 6e3d1b1 Author: Yakov Zhdanov <[email protected]> Authored: Mon Oct 5 11:26:10 2015 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Mon Oct 5 11:26:10 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 20 ++- .../GridDhtPartitionDemandMessage.java | 3 +- .../GridDhtPartitionSupplyMessage.java | 3 +- .../GridDhtPartitionsExchangeFuture.java | 16 ++- .../preloader/GridDhtPartitionsFullMessage.java | 13 +- .../GridDhtPartitionsSingleMessage.java | 11 +- .../communication/tcp/TcpCommunicationSpi.java | 22 ++- .../dht/GridCacheDhtPreloadPerformanceTest.java | 133 +++++++++++++++++++ 8 files changed, 193 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 421ec82..4000bbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -923,9 +924,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { ", old=" + clsHandlers.get(key) + ", new=" + c + ']'; } - if (log != null && log.isDebugEnabled()) - log.debug("Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + - ", msgIdx=" + msgIdx + ", handler=" + c + ']'); + IgniteLogger log0 = log; + + if (log0 != null && log0.isTraceEnabled()) + log0.trace( + "Registered cache communication handler [cacheId=" + cacheId + ", type=" + type + + ", msgIdx=" + msgIdx + ", handler=" + c + ']'); } /** @@ -978,15 +982,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings({"unchecked"}) public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) { + IgniteLogger log0 = log; + if (orderedHandlers.putIfAbsent(topic, c) == null) { cctx.gridIO().addMessageListener(topic, new OrderedMessageListener( (IgniteBiInClosure<UUID, GridCacheMessage>)c)); - if (log != null && log.isDebugEnabled()) - log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); + if (log0 != null && log0.isTraceEnabled()) + log0.trace("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']'); } - else if (log != null) - U.warn(log, "Failed to register ordered cache communication handler because it is already " + + else if (log0 != null) + U.warn(log0, "Failed to register ordered cache communication handler because it is already " + "registered for this topic [topic=" + topic + ", handler=" + c + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java index b588372..863ec8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java @@ -45,7 +45,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { private long updateSeq; /** Partition. */ - @GridToStringInclude @GridDirectCollection(int.class) private Collection<Integer> parts; @@ -330,4 +329,4 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage { return S.toString(GridDhtPartitionDemandMessage.class, this, "partCnt", parts.size(), "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 3ccc5ae..cf10a13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -404,7 +404,6 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G @Override public String toString() { return S.toString(GridDhtPartitionSupplyMessage.class, this, "size", size(), - "parts", infos.keySet(), "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a1b03c1..eaa5584 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -475,6 +475,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT busyLock.readLock().unlock(); } + // TODO remove + long inited; + /** * Starts activity. * @@ -488,6 +491,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (isDone()) return; + inited = U.currentTimeMillis(); + try { // Wait for event to occur to make sure that discovery // will return corresponding nodes. @@ -800,7 +805,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); - + dumpedObjects++; } } @@ -1059,7 +1064,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (super.onDone(res, err) && !dummy && !forcePreload) { if (log.isDebugEnabled()) - log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']'); + log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + + "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - inited) + ']'); initFut.onDone(err == null); @@ -1190,6 +1196,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (match) { boolean allReceived; + long start = U.currentTimeMillis(); + synchronized (rcvdIds) { if (rcvdIds.add(nodeId)) updatePartitionSingleMap(msg); @@ -1197,6 +1205,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT allReceived = allReceived(); } + long end = U.currentTimeMillis(); + // If got all replies, and initialization finished, and reply has not been sent yet. if (allReceived && ready.get() && replied.compareAndSet(false, true)) { spreadPartitions(); @@ -1206,7 +1216,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT else if (log.isDebugEnabled()) log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + - ", fut=" + this + ']'); + ", fut=" + GridDhtPartitionsExchangeFuture.this + ", updateDur=" + (end - start) + ']'); } } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index b91a2de..5429538 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -43,7 +43,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ @GridToStringInclude @GridDirectTransient - private Map<Integer, GridDhtPartitionFullMap> parts = new HashMap<>(); + private Map<Integer, GridDhtPartitionFullMap> parts; /** */ private byte[] partsBytes; @@ -68,7 +68,6 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @NotNull AffinityTopologyVersion topVer) { super(id, lastVer); - assert parts != null; assert id == null || topVer.equals(id.topologyVersion()); this.topVer = topVer; @@ -86,6 +85,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param fullMap Full partitions map. */ public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) { + if (parts == null) + parts = new HashMap<>(); + if (!parts.containsKey(cacheId)) parts.put(cacheId, fullMap); } @@ -95,7 +97,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (parts != null) + if (parts != null && partsBytes == null) partsBytes = ctx.marshaller().marshal(parts); } @@ -117,7 +119,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null) + if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); } @@ -200,4 +202,5 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa return S.toString(GridDhtPartitionsFullMessage.class, this, "partCnt", parts != null ? parts.size() : 0, "super", super.toString()); } -} \ No newline at end of file +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 9b6dcf7..83fbb1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -41,7 +41,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Local partitions. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, GridDhtPartitionMap> parts = new HashMap<>(); + private Map<Integer, GridDhtPartitionMap> parts; /** Serialized partitions. */ private byte[] partsBytes; @@ -83,6 +83,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param locMap Local partition map. */ public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap) { + if (parts == null) + parts = new HashMap<>(); + parts.put(cacheId, locMap); } @@ -98,7 +101,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (parts != null) + if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); } @@ -106,7 +109,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (partsBytes != null) + if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); } @@ -188,4 +191,4 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @Override public String toString() { return S.toString(GridDhtPartitionsSingleMessage.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c93d5af..5ea2c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2050,11 +2050,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (locNode == null) throw new IgniteCheckedException("Failed to create NIO client (local node is stopping)"); + if (log.isDebugEnabled()) + log.debug("Creating NIO client to node: " + node); + // If remote node has shared memory server enabled and has the same set of MACs // then we are likely to run on the same host and shared memory communication could be tried. if (shmemPort != null && U.sameMacs(locNode, node)) { try { - return createShmemClient(node, shmemPort); + GridCommunicationClient client = createShmemClient( + node, + shmemPort); + + if (log.isDebugEnabled()) + log.debug("Shmem client created: " + client); + + return client; } catch (IgniteCheckedException e) { if (e.hasCause(IpcOutOfSystemResourcesException.class)) @@ -2071,7 +2081,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - return createTcpClient(node); + GridCommunicationClient client = createTcpClient(node); + + if (log.isDebugEnabled()) + log.debug("TCP client created: " + client); + + return client; } finally { connectGate.leave(); @@ -2453,9 +2468,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw errs; } - if (log.isDebugEnabled()) - log.debug("Created client: " + client); - return client; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4f95be25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java new file mode 100644 index 0000000..8b68829 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadPerformanceTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.concurrent.Callable; + +/** + * Test cases for partitioned cache {@link GridDhtPreloader preloader}. + */ +public class GridCacheDhtPreloadPerformanceTest extends GridCommonAbstractTest { + /** */ + private static final int THREAD_CNT = 30; + + /** IP finder. */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode( + CacheMode.PARTITIONED); + cc.setWriteSynchronizationMode( + CacheWriteSynchronizationMode.FULL_SYNC); + cc.setRebalanceMode( + CacheRebalanceMode.SYNC); + cc.setAffinity(new RendezvousAffinityFunction(false, 1300)); + cc.setBackups(2); + + CacheConfiguration cc1 = defaultCacheConfiguration(); + + cc1.setName("cc1"); + cc1.setCacheMode( + CacheMode.PARTITIONED); + cc1.setWriteSynchronizationMode( + CacheWriteSynchronizationMode.FULL_SYNC); + cc1.setRebalanceMode( + CacheRebalanceMode.SYNC); + cc1.setAffinity( + new RendezvousAffinityFunction( + false, + 1300)); + cc1.setBackups(2); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setIgfsThreadPoolSize(1); + c.setSystemThreadPoolSize(2); + c.setPublicThreadPoolSize(2); + c.setManagementThreadPoolSize(1); + c.setUtilityCachePoolSize(2); + c.setPeerClassLoadingThreadPoolSize(1); + + c.setCacheConfiguration(cc, cc1); + + TcpCommunicationSpi comm = new TcpCommunicationSpi(); + + comm.setSharedMemoryPort(-1); + + c.setCommunicationSpi(comm); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStartPerformance() throws Exception { +// +// for (int i = 0; i < 10; i++) { +// try { +// startGrid(1); +// startGrid(2); +// startGrid(3); +// } +// finally { +// G.stopAll(true); +// } +// } + + multithreaded( + new Callable<Object>() { + @Override public Object call() throws Exception { + long start = U.currentTimeMillis(); + + Ignite grid = startGrid(Thread.currentThread().getName()); + + System.out.println( + ">>> Time to start: " + (U.currentTimeMillis() - start) + + ", topSize=" + grid.cluster().nodes().size()); + + return null; + } + }, + THREAD_CNT); + } +}
