ignite-5578 wip

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c10238b3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c10238b3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c10238b3

Branch: refs/heads/ignite-5578
Commit: c10238b346072c78eebbec650f179a6fdb3be1df
Parents: 0cb1a92
Author: sboikov <[email protected]>
Authored: Mon Jul 10 18:06:12 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Jul 10 18:06:12 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java | 24 ++++++
 .../processors/cache/ExchangeEvents.java        | 80 +++++++++++++++++++
 .../GridCachePartitionExchangeManager.java      | 59 ++++++++++++--
 .../GridDhtPartitionsExchangeFuture.java        | 84 +++++++++++++++++++-
 .../CacheExchangeCoalescingTest.java            | 14 ++++
 5 files changed, 252 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4c1077b..67d1afc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -81,6 +82,9 @@ public class DiscoCache {
     /** Alive nodes. */
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
+    /** */
+    private final IgniteProductVersion minNodeVer;
+
     /**
      * @param state Current cluster state.
      * @param loc Local node.
@@ -123,6 +127,26 @@ public class DiscoCache {
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
         this.alives.addAll(alives);
+
+        IgniteProductVersion minVer = null;
+
+        for (int i = 0; i < allNodes.size(); i++) {
+            ClusterNode node = allNodes.get(i);
+
+            if (minVer == null)
+                minVer = node.version();
+            else if (node.version().compareTo(minVer) < 0)
+                minVer = node.version();
+        }
+
+        minNodeVer = minVer;
+    }
+
+    /**
+     * @return Minimum node version.
+     */
+    public IgniteProductVersion minimumNodeVersion() {
+        return minNodeVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
new file mode 100644
index 0000000..6928d85
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeEvents.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+
+/**
+ *
+ */
+public class ExchangeEvents {
+    /** */
+    private AffinityTopologyVersion topVer;
+
+    /** */
+    private DiscoCache discoCache;
+
+    /** */
+    private boolean srvJoin;
+
+    /** */
+    private boolean srvLeft;
+
+    /**
+     * @param fut Future.
+     */
+    void init(GridDhtPartitionsExchangeFuture fut) {
+        topVer = fut.topologyVersion();
+        discoCache = fut.discoCache();
+
+        ClusterNode node = fut.discoveryEvent().eventNode();
+
+        if (fut.discoveryEvent().type()== EVT_NODE_JOINED)
+            srvJoin = !CU.clientNode(node);
+        else {
+            assert fut.discoveryEvent().type() == EVT_NODE_LEFT || 
fut.discoveryEvent().type() == EVT_NODE_FAILED;
+
+            srvLeft = !CU.clientNode(node);
+        }
+    }
+
+    DiscoCache discoveryCache() {
+        return discoCache;
+    }
+
+    AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    boolean serverJoin() {
+        return srvJoin;
+    }
+
+    boolean serverLeft() {
+        return srvLeft;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ed166ec..cac9c56 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -131,6 +131,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     private static final int EXCHANGE_HISTORY_SIZE =
         
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE,
 1_000);
 
+    /** TODO IGNITE-5578. */
+    public static final IgniteProductVersion EXCHANGE_COALESCING_SINCE = 
IgniteProductVersion.fromString("2.0.0");
+
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new 
AtomicReference<>();
 
@@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
             exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt);
 
-            exchFut = exchangeFuture(exchId, evt, cache,null, null);
+            exchFut = exchangeFuture(exchId, evt, cache, null, null);
         }
         else {
             DiscoveryCustomMessage customMsg = 
((DiscoveryCustomEvent)evt).customMessage();
@@ -1762,6 +1765,54 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx);
     }
 
+    private boolean supportsCoalescing(ClusterNode node) {
+        return 
node.version().compareToIgnoreTimestamp(EXCHANGE_COALESCING_SINCE) >= 0;
+    }
+
+    public ExchangeEvents 
checkExchangeCoalescing(GridDhtPartitionsExchangeFuture curFut) {
+        ExchangeEvents evts = null;
+
+        try {
+            U.sleep(1000);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+            if (task instanceof GridDhtPartitionsExchangeFuture) {
+                GridDhtPartitionsExchangeFuture fut = 
(GridDhtPartitionsExchangeFuture)task;
+
+                int evtType = fut.discoveryEvent().type();
+
+                if (evtType == EVT_NODE_JOINED) {
+                    DiscoveryEvent evt = fut.discoveryEvent();
+
+                    ClusterNode node = evt.eventNode();
+
+                    if (!supportsCoalescing(node))
+                        break;
+
+                    fut.mergeWithFuture(curFut);
+
+                    if (evts == null)
+                        evts = new ExchangeEvents();
+
+                    evts.init(fut);
+                }
+                else
+                    break;
+//                else if (evtType == EVT_NODE_LEFT || evtType == 
EVT_NODE_FAILED) {
+//
+//                }
+            }
+            else
+                break;
+        }
+
+        return evts;
+    }
+
     /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.
@@ -1781,12 +1832,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             super(cctx.igniteInstanceName(), "partition-exchanger", 
GridCachePartitionExchangeManager.this.log);
         }
 
-        void checkExchangeCoalescing() {
-            for (CachePartitionExchangeWorkerTask task : futQ) {
-
-            }
-        }
-
         /**
          * @param exchId Exchange ID.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6d98601..8c00c81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -60,6 +61,7 @@ import 
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
+import org.apache.ignite.internal.processors.cache.ExchangeEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -100,6 +102,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.*;
 
 /**
  * Future for exchanging partition maps.
@@ -423,6 +426,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         assert discoEvt != null : this;
         assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this;
 
+        boolean allowCoalescing = 
discoCache.minimumNodeVersion().compareTo(EXCHANGE_COALESCING_SINCE) >= 0;
+
         try {
             discoCache.updateAlives(cctx.discovery());
 
@@ -485,8 +490,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         List<T2<DynamicCacheDescriptor, 
NearCacheConfiguration>> caches =
                             cctx.cache().cachesToStartOnLocalJoin();
 
-                        if (cctx.database().persistenceEnabled() &&
-                            !cctx.kernalContext().clientNode()) {
+                        if (cctx.database().persistenceEnabled() && 
!cctx.kernalContext().clientNode()) {
                             List<DynamicCacheDescriptor> startDescs = new 
ArrayList<>();
 
                             if (caches != null) {
@@ -874,6 +878,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         cctx.database().beforeExchange(this);
 
+        ExchangeEvents mergedEvts = null;
+
+        if (crd.isLocal())
+            mergedEvts = cctx.exchange().checkExchangeCoalescing(this);
+
         if (crd.isLocal()) {
             if (remaining.isEmpty())
                 onAllReceived();
@@ -1354,6 +1363,64 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         }
     }
 
+    /** */
+    private GridDhtPartitionsExchangeFuture mergedWith;
+
+    /** */
+    private List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs;
+
+    /**
+     * @param fut Current exchange to merge with.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    public void mergeWithFuture(final GridDhtPartitionsExchangeFuture fut) {
+        List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> pendingMsgs = 
null;
+
+        synchronized (this) {
+            synchronized (fut) {
+                assert !isDone();
+                assert !initFut.isDone();
+                assert mergedWith == null;
+
+                mergedWith = fut;
+
+                if (this.pendingMsgs != null) {
+                    pendingMsgs = this.pendingMsgs;
+
+                    T2<ClusterNode, GridDhtPartitionsSingleMessage> 
joinedSrvMsg = null;
+
+                    if (discoEvt.type() == EVT_NODE_JOINED && 
!CU.clientNode(discoEvt.eventNode())) {
+                        for (Iterator<T2<ClusterNode, 
GridDhtPartitionsSingleMessage>> it = pendingMsgs.iterator(); it.hasNext();) {
+                            T2<ClusterNode, GridDhtPartitionsSingleMessage> 
msg = it.next();
+
+                            if (msg.get1().equals(discoEvt.eventNode())) {
+                                joinedSrvMsg = msg;
+
+                                it.remove();
+
+                                break;
+                            }
+                        }
+
+                        if (pendingMsgs.isEmpty())
+                            pendingMsgs = null;
+                    }
+                }
+            }
+        }
+
+        if (pendingMsgs != null) {
+            final List<T2<ClusterNode, GridDhtPartitionsSingleMessage>> 
pendingMsgs0 = pendingMsgs;
+
+            fut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut0) {
+                    for (T2<ClusterNode, GridDhtPartitionsSingleMessage> msg : 
pendingMsgs0)
+                        fut.processMessage(msg.get1(), msg.get2());
+                }
+            });
+        }
+    }
+
     /**
      * @param node Sender node.
      * @param msg Single partition info.
@@ -1375,6 +1442,19 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 sendAllPartitions(node.id(), 
cctx.gridConfig().getNetworkSendRetryCount());
         }
         else {
+            synchronized (this) {
+                if (mergedWith != null) {
+                    mergedWith.onReceive(node, msg);
+
+                    return;
+                }
+
+                if (pendingMsgs == null)
+                    pendingMsgs = new ArrayList<>();
+
+                pendingMsgs.add(new T2<>(node, msg));
+            }
+
             initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
                 @Override public void apply(IgniteInternalFuture<Boolean> f) {
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c10238b3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
index 0e95ecf..5b915f6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeCoalescingTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -50,6 +54,16 @@ public class CacheExchangeCoalescingTest extends 
GridCommonAbstractTest {
     public void testConcurrentJoin1() throws Exception {
         startGrid(0);
 
+        final AtomicInteger idx = new AtomicInteger(1);
 
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(idx.getAndIncrement());
+
+                return null;
+            }
+        }, 2, "start-node");
+
+        fut.get();
     }
 }

Reply via email to