Repository: ignite
Updated Branches:
  refs/heads/ignite-8783 87acce742 -> dd76d7518


IGNITE-8783

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd76d751
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd76d751
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd76d751

Branch: refs/heads/ignite-8783
Commit: dd76d751813c326a15f7d431acd9d86344b7e6f3
Parents: 87acce7
Author: Anton Vinogradov <a...@apache.org>
Authored: Wed Jul 18 00:45:00 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Wed Jul 18 00:45:00 2018 +0300

----------------------------------------------------------------------
 .../preloader/latch/ExchangeLatchManager.java   | 209 ++++++++++---------
 ...ExchangeLatchManagerCoordinatorFailTest.java |  19 +-
 2 files changed, 130 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dd76d751/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 02649a1..8a57b90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -16,12 +16,11 @@
  */
 package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +45,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
@@ -80,15 +78,15 @@ public class ExchangeLatchManager {
     private volatile ClusterNode crd;
 
     /** Pending acks collection. */
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
Set<UUID>> pendingAcks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, Set<UUID>> pendingAcks = 
new ConcurrentHashMap<>();
 
     /** Server latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ServerLatch> serverLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ServerLatch> 
serverLatches = new ConcurrentHashMap<>();
 
     /** Client latches collection. */
     @GridToStringInclude
-    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, 
ClientLatch> clientLatches = new ConcurrentHashMap<>();
+    private final ConcurrentMap<CompletableLatchUid, ClientLatch> 
clientLatches = new ConcurrentHashMap<>();
 
     /** Lock. */
     private final ReentrantLock lock = new ReentrantLock();
@@ -130,37 +128,30 @@ public class ExchangeLatchManager {
      * Creates server latch with given {@code id} and {@code topVer}.
      * Adds corresponding pending acks to it.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param participants Participant nodes.
      * @return Server latch instance.
      */
-    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
-
-        if (serverLatches.containsKey(latchId))
-            return serverLatches.get(latchId);
+    private Latch createServerLatch(CompletableLatchUid latchUid, 
Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
 
-        ServerLatch latch = new ServerLatch(id, topVer, participants);
+        ServerLatch latch = new ServerLatch(latchUid, participants);
 
-        serverLatches.put(latchId, latch);
+        serverLatches.put(latchUid, latch);
 
         if (log.isDebugEnabled())
-            log.debug("Server latch is created [latch=" + latchId + ", 
participantsSize=" + participants.size() + "]");
+            log.debug("Server latch is created [latch=" + latchUid + ", 
participantsSize=" + participants.size() + "]");
 
-        if (pendingAcks.containsKey(latchId)) {
-            Set<UUID> acks = pendingAcks.get(latchId);
+        if (pendingAcks.containsKey(latchUid)) {
+            Set<UUID> acks = pendingAcks.get(latchUid);
 
             for (UUID node : acks)
                 if (latch.hasParticipant(node) && !latch.hasAck(node))
                     latch.ack(node);
 
-            pendingAcks.remove(latchId);
+            pendingAcks.remove(latchUid);
         }
 
-        if (latch.isCompleted())
-            serverLatches.remove(latchId);
-
         return latch;
     }
 
@@ -168,34 +159,23 @@ public class ExchangeLatchManager {
      * Creates client latch.
      * If there is final ack corresponds to given {@code id} and {@code 
topVer}, latch will be completed immediately.
      *
-     * @param id Latch id.
-     * @param topVer Latch topology version.
+     * @param latchUid Latch uid.
      * @param coordinator Coordinator node.
      * @param participants Participant nodes.
      * @return Client latch instance.
      */
-    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, 
ClusterNode coordinator, Collection<ClusterNode> participants) {
-        final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, 
topVer);
+    private Latch createClientLatch(CompletableLatchUid latchUid, ClusterNode 
coordinator, Collection<ClusterNode> participants) {
+        assert !serverLatches.containsKey(latchUid);
+        assert !clientLatches.containsKey(latchUid);
 
-        if (clientLatches.containsKey(latchId))
-            return clientLatches.get(latchId);
-
-        ClientLatch latch = new ClientLatch(id, topVer, coordinator, 
participants);
+        ClientLatch latch = new ClientLatch(latchUid, coordinator, 
participants);
 
         if (log.isDebugEnabled())
-            log.debug("Client latch is created [latch=" + latchId
-                    + ", crd=" + coordinator
-                    + ", participantsSize=" + participants.size() + "]");
-
-        Set<UUID> nodeIds = pendingAcks.get(latchId);
+            log.debug("Client latch is created [latch=" + latchUid
+                + ", crd=" + coordinator
+                + ", participantsSize=" + participants.size() + "]");
 
-        // There is final ack for created latch.
-        if (nodeIds != null && nodeIds.contains(coordinator)) {
-            latch.complete();
-            pendingAcks.remove(latchId);
-        }
-        else
-            clientLatches.put(latchId, latch);
+        clientLatches.put(latchUid, latch);
 
         return latch;
     }
@@ -214,20 +194,24 @@ public class ExchangeLatchManager {
         lock.lock();
 
         try {
-            ClusterNode coordinator = getLatchCoordinator(topVer);
+            final CompletableLatchUid latchUid = new CompletableLatchUid(id, 
topVer);
 
-            if (coordinator == null) {
-                ClientLatch latch = new ClientLatch(id, 
AffinityTopologyVersion.NONE, null, Collections.emptyList());
-                latch.complete();
+            CompletableLatch latch = clientLatches.containsKey(latchUid) ?
+                clientLatches.get(latchUid) : serverLatches.get(latchUid);
 
+            if (latch != null)
                 return latch;
-            }
+
+            ClusterNode coordinator = getLatchCoordinator(topVer);
+
+            if (coordinator == null)
+                return null;
 
             Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
 
             return coordinator.isLocal()
-                ? createServerLatch(id, topVer, participants)
-                : createClientLatch(id, topVer, coordinator, participants);
+                ? createServerLatch(latchUid, participants)
+                : createClientLatch(latchUid, coordinator, participants);
         }
         finally {
             lock.unlock();
@@ -303,39 +287,36 @@ public class ExchangeLatchManager {
             if (coordinator == null)
                 return;
 
-            T2<String, AffinityTopologyVersion> latchId = new 
T2<>(message.latchId(), message.topVer());
+            CompletableLatchUid latchUid = new 
CompletableLatchUid(message.latchId(), message.topVer());
 
             if (message.isFinal()) {
                 if (log.isDebugEnabled())
-                    log.debug("Process final ack [latch=" + latchId + ", 
from=" + from + "]");
+                    log.debug("Process final ack [latch=" + latchUid + ", 
from=" + from + "]");
+
+                assert serverLatches.containsKey(latchUid) || 
clientLatches.containsKey(latchUid);
+
+                if (clientLatches.containsKey(latchUid)) {
+                    ClientLatch latch = clientLatches.remove(latchUid);
 
-                if (clientLatches.containsKey(latchId)) {
-                    ClientLatch latch = clientLatches.remove(latchId);
                     latch.complete();
                 }
-                else if (!coordinator.isLocal()) {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
-                }
-                else if (coordinator.isLocal())
-                    serverLatches.remove(latchId);
-            } else {
+
+                serverLatches.remove(latchUid);
+            }
+            else {
                 if (log.isDebugEnabled())
-                    log.debug("Process ack [latch=" + latchId + ", from=" + 
from + "]");
+                    log.debug("Process ack [latch=" + latchUid + ", from=" + 
from + "]");
 
-                if (serverLatches.containsKey(latchId)) {
-                    ServerLatch latch = serverLatches.get(latchId);
+                if (serverLatches.containsKey(latchUid)) {
+                    ServerLatch latch = serverLatches.get(latchUid);
 
-                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
+                    if (latch.hasParticipant(from) && !latch.hasAck(from))
                         latch.ack(from);
-
-                        if (latch.isCompleted())
-                            serverLatches.remove(latchId);
-                    }
                 }
                 else {
-                    pendingAcks.computeIfAbsent(latchId, (id) -> new 
GridConcurrentHashSet<>());
-                    pendingAcks.get(latchId).add(from);
+                    pendingAcks.computeIfAbsent(latchUid, (id) -> new 
GridConcurrentHashSet<>());
+
+                    pendingAcks.get(latchUid).add(from);
                 }
             }
         }
@@ -352,17 +333,18 @@ public class ExchangeLatchManager {
         if (log.isInfoEnabled())
             log.info("Become new coordinator " + crd.id());
 
-        List<T2<String, AffinityTopologyVersion>> latchesToRestore = new 
ArrayList<>();
+        Set<CompletableLatchUid> latchesToRestore = new HashSet<>();
+
         latchesToRestore.addAll(pendingAcks.keySet());
         latchesToRestore.addAll(clientLatches.keySet());
 
-        for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) {
-            String id = latchId.get1();
-            AffinityTopologyVersion topVer = latchId.get2();
+        for (CompletableLatchUid latchUid : latchesToRestore) {
+            String id = latchUid.id;
+            AffinityTopologyVersion topVer = latchUid.topVer;
             Collection<ClusterNode> participants = 
getLatchParticipants(topVer);
 
             if (!participants.isEmpty())
-                createServerLatch(id, topVer, participants);
+                createServerLatch(latchUid, participants);
         }
     }
 
@@ -392,12 +374,12 @@ public class ExchangeLatchManager {
                 return;
 
             // Clear pending acks.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> 
ackEntry : pendingAcks.entrySet())
+            for (Map.Entry<CompletableLatchUid, Set<UUID>> ackEntry : 
pendingAcks.entrySet())
                 if (ackEntry.getValue().contains(left.id()))
                     pendingAcks.get(ackEntry.getKey()).remove(left.id());
 
             // Change coordinator for client latches.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> 
latchEntry : clientLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ClientLatch> latchEntry : 
clientLatches.entrySet()) {
                 ClientLatch latch = latchEntry.getValue();
                 if (latch.hasCoordinator(left.id())) {
                     // Change coordinator for latch and re-send ack if 
necessary.
@@ -407,7 +389,7 @@ public class ExchangeLatchManager {
                         /* If new coordinator is not able to take control on 
the latch,
                            it means that all other latch participants are left 
from topology
                            and there is no reason to track such latch. */
-                        AffinityTopologyVersion topVer = 
latchEntry.getKey().get2();
+                        AffinityTopologyVersion topVer = 
latchEntry.getKey().topVer;
 
                         assert getLatchParticipants(topVer).isEmpty();
 
@@ -418,7 +400,7 @@ public class ExchangeLatchManager {
             }
 
             // Add acknowledgements from left node.
-            for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> 
latchEntry : serverLatches.entrySet()) {
+            for (Map.Entry<CompletableLatchUid, ServerLatch> latchEntry : 
serverLatches.entrySet()) {
                 ServerLatch latch = latchEntry.getValue();
 
                 if (latch.hasParticipant(left.id()) && 
!latch.hasAck(left.id())) {
@@ -426,9 +408,6 @@ public class ExchangeLatchManager {
                         log.debug("Process node left [latch=" + 
latchEntry.getKey() + ", left=" + left.id() + "]");
 
                     latch.ack(left.id());
-
-                    if (latch.isCompleted())
-                        serverLatches.remove(latchEntry.getKey());
                 }
             }
 
@@ -461,12 +440,11 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        ServerLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ServerLatch(CompletableLatchUid latchUid, Collection<ClusterNode> 
participants) {
+            super(latchUid, participants);
             this.permits = new AtomicInteger(participants.size());
 
             // Send final acks when latch is completed.
@@ -561,13 +539,12 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param coordinator Coordinator node.
          * @param participants Participant nodes.
          */
-        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode 
coordinator, Collection<ClusterNode> participants) {
-            super(id, topVer, participants);
+        ClientLatch(CompletableLatchUid latchUid, ClusterNode coordinator, 
Collection<ClusterNode> participants) {
+            super(latchUid, participants);
 
             this.coordinator = coordinator;
         }
@@ -661,13 +638,12 @@ public class ExchangeLatchManager {
         /**
          * Constructor.
          *
-         * @param id Latch id.
-         * @param topVer Latch topology version.
+         * @param latchUid Latch uid.
          * @param participants Participant nodes.
          */
-        CompletableLatch(String id, AffinityTopologyVersion topVer, 
Collection<ClusterNode> participants) {
-            this.id = id;
-            this.topVer = topVer;
+        CompletableLatch(CompletableLatchUid latchUid, Collection<ClusterNode> 
participants) {
+            this.id = latchUid.id;
+            this.topVer = latchUid.topVer;
             this.participants = 
participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
         }
 
@@ -727,6 +703,47 @@ public class ExchangeLatchManager {
         }
     }
 
+    /**
+     * Latch id + topology
+     */
+    private static class CompletableLatchUid {
+        /** Id. */
+        private String id;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /**
+         * @param id Id.
+         * @param topVer Topology version.
+         */
+        private CompletableLatchUid(String id, AffinityTopologyVersion topVer) 
{
+            this.id = id;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            CompletableLatchUid uid = (CompletableLatchUid)o;
+            return Objects.equals(id, uid.id) &&
+                Objects.equals(topVer, uid.topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(id, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CompletableLatchUid{" + "id='" + id + '\'' + ", topVer=" + 
topVer + '}';
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ExchangeLatchManager.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dd76d751/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
index 52cd033..3bff341 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java
@@ -16,16 +16,18 @@
  */
 package org.apache.ignite.internal.processors.cache.datastructures;
 
+import com.google.common.collect.Lists;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.collect.Lists;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -240,5 +242,18 @@ public class IgniteExchangeLatchManagerCoordinatorFailTest 
extends GridCommonAbs
         finishAllLatches.get(5000);
 
         Assert.assertFalse("All nodes should complete latches without errors", 
hasErrors.get());
+
+        awaitPartitionMapExchange();
+
+        for (int node = 1; node < 5; node++) {
+            IgniteEx grid = grid(node);
+            ExchangeLatchManager latchMgr = 
grid.context().cache().context().exchange().latch();
+
+            Map srvLatches = U.field(latchMgr, "serverLatches");
+            Map cliLatches = U.field(latchMgr, "clientLatches");
+
+            assertTrue(srvLatches.keySet().toString(), srvLatches.isEmpty());
+            assertTrue(cliLatches.keySet().toString(), cliLatches.isEmpty());
+        }
     }
 }

Reply via email to