ignite-5578 wip
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c10238b3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c10238b3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c10238b3 Branch: refs/heads/ignite-5578 Commit: c10238b346072c78eebbec650f179a6fdb3be1df Parents: 0cb1a92 Author: sboikov <[email protected]> Authored: Mon Jul 10 18:06:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jul 10 18:06:12 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 24 ++++++ .../processors/cache/ExchangeEvents.java | 80 +++++++++++++++++++ .../GridCachePartitionExchangeManager.java | 59 ++++++++++++-- .../GridDhtPartitionsExchangeFuture.java | 84 +++++++++++++++++++- .../CacheExchangeCoalescingTest.java | 14 ++++ 5 files changed, 252 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 4c1077b..67d1afc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.Nullable; /** @@ -81,6 +82,9 @@ public class DiscoCache { /** Alive nodes. */ private final Set<UUID> alives = new GridConcurrentHashSet<>(); + /** */ + private final IgniteProductVersion minNodeVer; + /** * @param state Current cluster state. * @param loc Local node. @@ -123,6 +127,26 @@ public class DiscoCache { this.cacheGrpAffNodes = cacheGrpAffNodes; this.nodeMap = nodeMap; this.alives.addAll(alives); + + IgniteProductVersion minVer = null; + + for (int i = 0; i < allNodes.size(); i++) { + ClusterNode node = allNodes.get(i); + + if (minVer == null) + minVer = node.version(); + else if (node.version().compareTo(minVer) < 0) + minVer = node.version(); + } + + minNodeVer = minVer; + } + + /** + * @return Minimum node version. + */ + public IgniteProductVersion minimumNodeVersion() { + return minNodeVer; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java new file mode 100644 index 0000000..6928d85 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + +/** + * + */ +public class ExchangeEvents { + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private DiscoCache discoCache; + + /** */ + private boolean srvJoin; + + /** */ + private boolean srvLeft; + + /** + * @param fut Future. + */ + void init(GridDhtPartitionsExchangeFuture fut) { + topVer = fut.topologyVersion(); + discoCache = fut.discoCache(); + + ClusterNode node = fut.discoveryEvent().eventNode(); + + if (fut.discoveryEvent().type()== EVT_NODE_JOINED) + srvJoin = !CU.clientNode(node); + else { + assert fut.discoveryEvent().type() == EVT_NODE_LEFT || fut.discoveryEvent().type() == EVT_NODE_FAILED; + + srvLeft = !CU.clientNode(node); + } + } + + DiscoCache discoveryCache() { + return discoCache; + } + + AffinityTopologyVersion topologyVersion() { + return topVer; + } + + boolean serverJoin() { + return srvJoin; + } + + boolean serverLeft() { + return srvLeft; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index ed166ec..cac9c56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -131,6 +131,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private static final int EXCHANGE_HISTORY_SIZE = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000); + /** TODO IGNITE-5578. */ + public static final IgniteProductVersion EXCHANGE_COALESCING_SINCE = IgniteProductVersion.fromString("2.0.0"); + /** Atomic reference for pending timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); @@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); - exchFut = exchangeFuture(exchId, evt, cache,null, null); + exchFut = exchangeFuture(exchId, evt, cache, null, null); } else { DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage(); @@ -1762,6 +1765,54 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx); } + private boolean supportsCoalescing(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(EXCHANGE_COALESCING_SINCE) >= 0; + } + + public ExchangeEvents checkExchangeCoalescing(GridDhtPartitionsExchangeFuture curFut) { + ExchangeEvents evts = null; + + try { + U.sleep(1000); + } + catch (Exception e) { + e.printStackTrace(); + } + + for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + if (task instanceof GridDhtPartitionsExchangeFuture) { + GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; + + int evtType = fut.discoveryEvent().type(); + + if (evtType == EVT_NODE_JOINED) { + DiscoveryEvent evt = fut.discoveryEvent(); + + ClusterNode node = evt.eventNode(); + + if (!supportsCoalescing(node)) + break; + + fut.mergeWithFuture(curFut); + + if (evts == null) + evts = new ExchangeEvents(); + + evts.init(fut); + } + else + break; +// else if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) { +// +// } + } + else + break; + } + + return evts; + } + /** * Exchange future thread. All exchanges happen only by one thread and next * exchange will not start until previous one completes. @@ -1781,12 +1832,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log); } - void checkExchangeCoalescing() { - for (CachePartitionExchangeWorkerTask task : futQ) { - - } - } - /** * @param exchId Exchange ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6d98601..8c00c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.ExchangeEvents; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -100,6 +102,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*; /** * Future for exchanging partition maps. @@ -423,6 +426,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert discoEvt != null : this; assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this; + boolean allowCoalescing = discoCache.minimumNodeVersion().compareTo(EXCHANGE_COALESCING_SINCE) >= 0; + try { discoCache.updateAlives(cctx.discovery()); @@ -485,8 +490,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = cctx.cache().cachesToStartOnLocalJoin(); - if (cctx.database().persistenceEnabled() && - !cctx.kernalContext().clientNode()) { + if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); if (caches != null) { @@ -874,6 +878,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.database().beforeExchange(this); + ExchangeEvents mergedEvts = null; + + if (crd.isLocal()) + mergedEvts = cctx.exchange().checkExchangeCoalescing(this); + if (crd.isLocal()) { if (remaining.isEmpty()) onAllReceived(); @@ -1354,6 +1363,64 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + /** */ + private GridDhtPartitionsExchangeFuture mergedWith; + + /** */ + private List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs; + + /** + * @param fut Current exchange to merge with. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) { + List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs = null; + + synchronized (this) { + synchronized (fut) { + assert !isDone(); + assert !initFut.isDone(); + assert mergedWith == null; + + mergedWith = fut; + + if (this.pendingMsgs != null) { + pendingMsgs = this.pendingMsgs; + + T2<ClusterNode, GridDhtPartitionsSingleMessage> joinedSrvMsg = null; + + if (discoEvt.type() == EVT_NODE_JOINED && !CU.clientNode(discoEvt.eventNode())) { + for (Iterator<T2<ClusterNode, GridDhtPartitionsSingleMessage>> it = pendingMsgs.iterator(); it.hasNext();) { + T2<ClusterNode, GridDhtPartitionsSingleMessage> msg = it.next(); + + if (msg.get1().equals(discoEvt.eventNode())) { + joinedSrvMsg = msg; + + it.remove(); + + break; + } + } + + if (pendingMsgs.isEmpty()) + pendingMsgs = null; + } + } + } + } + + if (pendingMsgs != null) { + final List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs0 = pendingMsgs; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut0) { + for (T2<ClusterNode, GridDhtPartitionsSingleMessage> msg : pendingMsgs0) + fut.processMessage(msg.get1(), msg.get2()); + } + }); + } + } + /** * @param node Sender node. * @param msg Single partition info. @@ -1375,6 +1442,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount()); } else { + synchronized (this) { + if (mergedWith != null) { + mergedWith.onReceive(node, msg); + + return; + } + + if (pendingMsgs == null) + pendingMsgs = new ArrayList<>(); + + pendingMsgs.add(new T2<>(node, msg)); + } + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> f) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java index 0e95ecf..5b915f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java @@ -17,10 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -50,6 +54,16 @@ public class CacheExchangeCoalescingTest extends GridCommonAbstractTest { public void testConcurrentJoin1() throws Exception { startGrid(0); + final AtomicInteger idx = new AtomicInteger(1); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(idx.getAndIncrement()); + + return null; + } + }, 2, "start-node"); + + fut.get(); } }
