Repository: incubator-gossip
Updated Branches:
  refs/heads/master ac8303893 -> 5ed3ed85c


GOSSIP-75 Vote based locking


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/5ed3ed85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/5ed3ed85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/5ed3ed85

Branch: refs/heads/master
Commit: 5ed3ed85cf781f02b8e487ac396980c9909e4146
Parents: ac83038
Author: Mirage Abeysekara <mirage...@cse.mrt.ac.lk>
Authored: Fri Jul 14 22:19:16 2017 +0530
Committer: Mirage Abeysekara <mirage...@cse.mrt.ac.lk>
Committed: Wed Aug 23 23:31:39 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipSettings.java  |  18 +-
 .../java/org/apache/gossip/crdt/CrdtModule.java |  31 ++
 .../org/apache/gossip/lock/LockManager.java     | 318 +++++++++++++++++++
 .../apache/gossip/lock/LockManagerSettings.java |  83 +++++
 .../lock/exceptions/VoteFailedException.java    |  43 +++
 .../apache/gossip/lock/vote/MajorityVote.java   | 169 ++++++++++
 .../gossip/lock/vote/RandomVoteSelector.java    |  35 ++
 .../java/org/apache/gossip/lock/vote/Vote.java  |  70 ++++
 .../apache/gossip/lock/vote/VoteCandidate.java  |  75 +++++
 .../apache/gossip/lock/vote/VoteSelector.java   |  33 ++
 .../apache/gossip/manager/GossipManager.java    |  30 +-
 .../gossip/lock/vote/MajorityVoteTest.java      | 101 ++++++
 .../org/apache/gossip/SharedDataLockTest.java   | 125 ++++++++
 13 files changed, 1128 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java 
b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 32c00c9..4ea0ab6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -17,6 +17,8 @@
  */
 package org.apache.gossip;
 
+import org.apache.gossip.lock.LockManagerSettings;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -61,7 +63,10 @@ public class GossipSettings {
   private String pathToKeyStore = "./keys";
   
   private boolean signMessages = false;
-  
+
+  // Settings related to lock manager
+  private LockManagerSettings lockManagerSettings = LockManagerSettings
+          .getLockManagerDefaultSettings();
   
   /**
    * Construct GossipSettings with default settings.
@@ -242,4 +247,15 @@ public class GossipSettings {
     this.protocolManagerClass = protocolManagerClass;
   }
 
+  public LockManagerSettings getLockManagerSettings() {
+    return lockManagerSettings;
+  }
+
+  /**
+   * Set the lock settings use by the lock manager
+   * @param lockManagerSettings lock settings. This object cannot be null.
+   */
+  public void setLockManagerSettings(LockManagerSettings lockManagerSettings) {
+    this.lockManagerSettings = lockManagerSettings;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java 
b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index 396ec03..83d573d 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.Version;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.gossip.LocalMember;
+import org.apache.gossip.lock.vote.MajorityVote;
+import org.apache.gossip.lock.vote.Vote;
+import org.apache.gossip.lock.vote.VoteCandidate;
 import org.apache.gossip.replication.BlackListReplicable;
 import org.apache.gossip.replication.Replicable;
 import org.apache.gossip.replication.WhiteListReplicable;
@@ -107,6 +110,31 @@ abstract class BlackListReplicableMixin {
   @JsonProperty("blackListMembers") abstract List<LocalMember> 
getBlackListMembers();
 }
 
+abstract class VoteCandidateMixin {
+  @JsonCreator
+  VoteCandidateMixin(
+          @JsonProperty("candidateNodeId") String candidateNodeId,
+          @JsonProperty("votingKey") String votingKey,
+          @JsonProperty("votes") Map<String, Vote> votes
+  ) { }
+}
+
+abstract class VoteMixin {
+  @JsonCreator
+  VoteMixin(
+          @JsonProperty("votingNode") String votingNode,
+          @JsonProperty("voteValue") Boolean voteValue,
+          @JsonProperty("voteExchange") Boolean voteExchange,
+          @JsonProperty("liveMembers") List<String> liveMembers,
+          @JsonProperty("deadMembers") List<String> deadMembers
+  ) { }
+}
+
+abstract class MajorityVoteMixin<E>{
+  @JsonCreator
+  MajorityVoteMixin(@JsonProperty("voteCandidates") Map<String, VoteCandidate> 
voteCandidateMap){ }
+}
+
 //If anyone wants to take a stab at this. please have at it
 
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
 public class CrdtModule extends SimpleModule {
@@ -130,6 +158,9 @@ public class CrdtModule extends SimpleModule {
     context.setMixInAnnotations(Replicable.class, ReplicableMixin.class);
     context.setMixInAnnotations(WhiteListReplicable.class, 
WhiteListReplicableMixin.class);
     context.setMixInAnnotations(BlackListReplicable.class, 
BlackListReplicableMixin.class);
+    context.setMixInAnnotations(MajorityVote.class, MajorityVoteMixin.class);
+    context.setMixInAnnotations(VoteCandidate.class, VoteCandidateMixin.class);
+    context.setMixInAnnotations(Vote.class, VoteMixin.class);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
new file mode 100644
index 0000000..9f4636a
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.gossip.Member;
+import org.apache.gossip.lock.exceptions.VoteFailedException;
+import org.apache.gossip.lock.vote.MajorityVote;
+import org.apache.gossip.lock.vote.Vote;
+import org.apache.gossip.lock.vote.VoteCandidate;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class LockManager {
+
+  public static final Logger LOGGER = Logger.getLogger(LockManager.class);
+
+  private final GossipManager gossipManager;
+  private final LockManagerSettings lockSettings;
+  private final ScheduledExecutorService voteService;
+  private final AtomicInteger numberOfNodes;
+  private final Set<String> lockKeys;
+  // For MetricRegistry
+  public static final String LOCK_KEY_SET_SIZE = "gossip.lock.key_set_size";
+  public static final String LOCK_TIME = "gossip.lock.time";
+  private final Timer lockTimeMetric;
+
+  public LockManager(GossipManager gossipManager, final LockManagerSettings 
lockManagerSettings,
+          MetricRegistry metrics) {
+    this.gossipManager = gossipManager;
+    this.lockSettings = lockManagerSettings;
+    this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes());
+    this.lockKeys = new CopyOnWriteArraySet<>();
+    metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size);
+    lockTimeMetric = metrics.timer(LOCK_TIME);
+    // Register listener for lock keys
+    gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
+      if (key.contains("lock/")) {
+        lockKeys.add(key);
+      }
+    });
+    voteService = Executors.newScheduledThreadPool(2);
+    voteService.scheduleAtFixedRate(this::updateVotes, 0, 
lockSettings.getVoteUpdateInterval(),
+            TimeUnit.MILLISECONDS);
+  }
+
+  public void acquireSharedDataLock(String key) throws VoteFailedException {
+    final Timer.Context context = lockTimeMetric.time();
+    gossipManager.merge(generateLockMessage(key));
+    int deadlockDetectCount = 0;
+    while (true) {
+      SharedDataMessage message = 
gossipManager.findSharedGossipData(generateLockKey(key));
+      if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+        continue;
+      }
+      MajorityVote majorityVoteResult = (MajorityVote) message.getPayload();
+      final Map<String, VoteCandidate> voteCandidatesMap = 
majorityVoteResult.value();
+      final Map<String, Boolean> voteResultMap = new HashMap<>();
+      // Store the vote result for each vote candidate nodes
+      voteCandidatesMap.forEach((candidateId, voteCandidate) -> voteResultMap
+              .put(candidateId, isVoteSuccess(voteCandidate)));
+
+      long passedCandidates = voteResultMap.values().stream().filter(aBoolean 
-> aBoolean).count();
+      String myNodeId = gossipManager.getMyself().getId();
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("NodeId=" + myNodeId + ", VoteMap=" + voteResultMap + ", 
WinnerCount="
+                + passedCandidates);
+      }
+      // Check for possible dead lock when no candidates were won
+      if (passedCandidates == 0) {
+        if (isDeadLock(voteCandidatesMap)) {
+          deadlockDetectCount++;
+          // Testing for deadlock is not always correct, therefore test for 
continues deadlocks
+          if (deadlockDetectCount >= 
lockSettings.getDeadlockDetectionThreshold()) {
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug("Deadlock detected from node " + myNodeId + ". 
VoteCandidatesMap="
+                      + voteCandidatesMap);
+            }
+            preventDeadLock(voteCandidatesMap);
+          }
+        } else {
+          deadlockDetectCount = 0;
+        }
+      } else if (passedCandidates == 1 && voteResultMap.containsKey(myNodeId)) 
{
+        context.stop();
+        if (voteResultMap.get(myNodeId)) {
+          // There is one winner and that is my node, therefore break the 
while loop and continue
+          break;
+        } else {
+          throw new VoteFailedException("Node " + myNodeId + " failed to lock 
on key: " + key);
+        }
+      } else if (passedCandidates > 1) {
+        // Multiple winners are not possible
+        context.stop();
+        throw new IllegalStateException("Multiple nodes get voted.");
+      }
+
+      try {
+        Thread.sleep(lockSettings.getResultCalculationDelay());
+      } catch (InterruptedException e) {
+        throw new VoteFailedException("Node " + myNodeId + " failed to lock on 
key: " + key, e);
+      }
+    }
+  }
+
+  // Generate Crdt lock message for voting
+  private SharedDataMessage generateLockMessage(String key) {
+    VoteCandidate voteCandidate = new 
VoteCandidate(gossipManager.getMyself().getId(), key,
+            new ConcurrentHashMap<>());
+    voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true, 
false,
+            
gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()),
+            gossipManager.getDeadMembers().stream().map(Member::getId)
+                    .collect(Collectors.toList())));
+    Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>();
+    voteCandidateMap.put(voteCandidate.getCandidateNodeId(), voteCandidate);
+    MajorityVote majorityVote = new MajorityVote(voteCandidateMap);
+    SharedDataMessage lockMessage = new SharedDataMessage();
+    lockMessage.setKey(generateLockKey(key));
+    lockMessage.setPayload(majorityVote);
+    lockMessage.setExpireAt(Long.MAX_VALUE);
+    lockMessage.setTimestamp(System.currentTimeMillis());
+    return lockMessage;
+  }
+
+  // This method will run periodically to vote the other nodes
+  private void updateVotes() {
+    for (String lockKey : lockKeys) {
+      SharedDataMessage message = gossipManager.findSharedGossipData(lockKey);
+      if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+        continue;
+      }
+      MajorityVote majorityVote = (MajorityVote) message.getPayload();
+      Map<String, VoteCandidate> voteCandidateMap = majorityVote.value();
+      String myNodeId = gossipManager.getMyself().getId();
+      // No need to vote if my node is already voted to every node for the key
+      if (isVotedToAll(myNodeId, voteCandidateMap)) {
+        continue;
+      }
+      String myVoteCandidate = getVotedCandidateNodeId(myNodeId, 
voteCandidateMap);
+
+      if (myVoteCandidate == null) {
+        myVoteCandidate = 
lockSettings.getVoteSelector().getVoteCandidateId(voteCandidateMap.keySet());
+      }
+      for (VoteCandidate voteCandidate : voteCandidateMap.values()) {
+        if (voteCandidate.getCandidateNodeId().equals(myNodeId)) {
+          continue;
+        }
+        // Vote for selected candidate
+        boolean voteResult = 
voteCandidate.getCandidateNodeId().equals(myVoteCandidate);
+        voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), 
voteResult, false,
+                gossipManager.getLiveMembers().stream().map(Member::getId)
+                        .collect(Collectors.toList()),
+                gossipManager.getDeadMembers().stream().map(Member::getId)
+                        .collect(Collectors.toList())));
+      }
+    }
+  }
+
+  // Return true if every node has a vote from given node id.
+  private boolean isVotedToAll(String nodeId, final Map<String, VoteCandidate> 
voteCandidates) {
+    int voteCount = 0;
+    for (VoteCandidate voteCandidate : voteCandidates.values()) {
+      if (voteCandidate.getVotes().containsKey(nodeId)) {
+        voteCount++;
+      }
+    }
+    return voteCount == voteCandidates.size();
+  }
+
+  // Returns true if there is a deadlock for given vote candidates
+  private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates) {
+    boolean result = true;
+    int numberOfLiveNodes;
+    if (numberOfNodes.get() > 0) {
+      numberOfLiveNodes = numberOfNodes.get();
+    } else {
+      // numberOfNodes is not set by the user, therefore calculate it.
+      Set<String> liveNodes = voteCandidates.values().stream()
+              .map(voteCandidate -> 
voteCandidate.getVotes().values()).flatMap(Collection::stream)
+              
.map(Vote::getLiveMembers).flatMap(List::stream).collect(Collectors.toSet());
+      numberOfLiveNodes = liveNodes.size();
+    }
+    for (VoteCandidate voteCandidate : voteCandidates.values()) {
+      result = result && voteCandidate.getVotes().size() == numberOfLiveNodes;
+    }
+    return result;
+  }
+
+  // Prevent the deadlock by giving up the votes
+  private void preventDeadLock(Map<String, VoteCandidate> voteCandidates) {
+    String myNodeId = gossipManager.getMyself().getId();
+    VoteCandidate myResults = voteCandidates.get(myNodeId);
+    if (myResults == null) {
+      return;
+    }
+    // Set of nodes that is going to receive this nodes votes
+    List<String> donateCandidateIds = voteCandidates.keySet().stream()
+            .filter(s -> s.compareTo(myNodeId) < 
0).collect(Collectors.toList());
+    if (donateCandidateIds.size() == 0) {
+      return;
+    }
+    // Select a random node to donate
+    Random randomizer = new Random();
+    String selectedCandidateId = donateCandidateIds
+            .get(randomizer.nextInt(donateCandidateIds.size()));
+    VoteCandidate selectedCandidate = voteCandidates.get(selectedCandidateId);
+
+    Set<Vote> myVotes = new HashSet<>(myResults.getVotes().values());
+    Set<Vote> selectedCandidateVotes = new 
HashSet<>(selectedCandidate.getVotes().values());
+    // Exchange the votes
+    for (Vote myVote : myVotes) {
+      for (Vote candidateVote : selectedCandidateVotes) {
+        if (myVote.getVoteValue() && 
myVote.getVotingNode().equals(candidateVote.getVotingNode())) {
+          myVote.setVoteExchange(true);
+          candidateVote.setVoteExchange(true);
+          selectedCandidate.getVotes().put(myVote.getVotingNode(), myVote);
+          myResults.getVotes().put(candidateVote.getVotingNode(), 
candidateVote);
+        }
+      }
+    }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Node " + myNodeId + " give up votes to node " + 
selectedCandidateId);
+    }
+  }
+
+  private String getVotedCandidateNodeId(String nodeId,
+          final Map<String, VoteCandidate> voteCandidates) {
+    for (VoteCandidate voteCandidate : voteCandidates.values()) {
+      Vote vote = voteCandidate.getVotes().get(nodeId);
+      if (vote != null && vote.getVoteValue()) {
+        return voteCandidate.getCandidateNodeId();
+      }
+    }
+    return null;
+  }
+
+  // Return true if the given candidate has passed the vote
+  private boolean isVoteSuccess(VoteCandidate voteCandidate) {
+    Set<String> liveNodes = new HashSet<>();
+    int voteCount = 0;
+    for (Vote vote : voteCandidate.getVotes().values()) {
+      liveNodes.addAll(vote.getLiveMembers());
+      if (vote.getVoteValue()) {
+        voteCount++;
+      }
+    }
+    int numberOfLiveNodes;
+    if (numberOfNodes.get() > 0) {
+      numberOfLiveNodes = numberOfNodes.get();
+    } else {
+      numberOfLiveNodes = liveNodes.size();
+    }
+    return numberOfLiveNodes > 0 && voteCount >= (numberOfLiveNodes / 2 + 1);
+  }
+
+  private String generateLockKey(String key){
+    return "lock/" + key;
+  }
+
+  public void shutdown(){
+    voteService.shutdown();
+  }
+  /**
+   * Get the voted node id from this node for a given key
+   * @param key key of the data object
+   * @return Voted node id
+   */
+  public String getVotedCandidateNodeId(String key) {
+    SharedDataMessage message = 
gossipManager.findSharedGossipData(generateLockKey(key));
+    if (message == null || !(message.getPayload() instanceof MajorityVote)) {
+      return null;
+    }
+    MajorityVote majorityVote = (MajorityVote) message.getPayload();
+    return getVotedCandidateNodeId(gossipManager.getMyself().getId(), 
majorityVote.value());
+  }
+
+  /**
+   * Set the number of live nodes. If this value is negative, live nodes will 
be calculated
+   * @param numberOfNodes live node count or negative to calculate.
+   */
+  public void setNumberOfNodes(int numberOfNodes) {
+    this.numberOfNodes.set(numberOfNodes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java
new file mode 100644
index 0000000..4af47a2
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock;
+
+import org.apache.gossip.lock.vote.RandomVoteSelector;
+import org.apache.gossip.lock.vote.VoteSelector;
+
+/**
+ * Stores the lock manager related settings.
+ */
+public class LockManagerSettings {
+  // Time between vote updates in ms. Default is 1 second.
+  private final int voteUpdateInterval;
+  // Vote selection algorithm. Default is random voting
+  private final VoteSelector voteSelector;
+  // Number of nodes available for voting. Default is -1 (Auto calculate)
+  private final int numberOfNodes;
+  // Number of times to test for deadlock before preventing. Default is 3
+  private final int deadlockDetectionThreshold;
+  // Wait time between vote result calculation. Default is 1000
+  private final int resultCalculationDelay;
+
+  /**
+   * Construct LockManagerSettings with default settings.
+   */
+  public static LockManagerSettings getLockManagerDefaultSettings() {
+    return new LockManagerSettings(1000, new RandomVoteSelector(), -1, 3, 
1000);
+  }
+
+  /**
+   * Construct a custom LockManagerSettings
+   *
+   * @param voteUpdateInterval         Time between vote updates in 
milliseconds.
+   * @param voteSelector               Vote selection algorithm. Cannot be null
+   * @param numberOfNodes              Number of nodes available for voting. 
Set to negative value for auto calculate
+   * @param deadlockDetectionThreshold Number of times to test for deadlock 
before preventing
+   * @param resultCalculationDelay     Wait time between vote result 
calculation
+   */
+  public LockManagerSettings(int voteUpdateInterval, VoteSelector 
voteSelector, int numberOfNodes,
+          int deadlockDetectionThreshold, int resultCalculationDelay) {
+    this.voteUpdateInterval = voteUpdateInterval;
+    this.voteSelector = voteSelector;
+    this.numberOfNodes = numberOfNodes;
+    this.deadlockDetectionThreshold = deadlockDetectionThreshold;
+    this.resultCalculationDelay = resultCalculationDelay;
+
+  }
+
+  public int getVoteUpdateInterval() {
+    return voteUpdateInterval;
+  }
+
+  public VoteSelector getVoteSelector() {
+    return voteSelector;
+  }
+
+  public int getNumberOfNodes() {
+    return numberOfNodes;
+  }
+
+  public int getDeadlockDetectionThreshold() {
+    return deadlockDetectionThreshold;
+  }
+
+  public int getResultCalculationDelay() {
+    return resultCalculationDelay;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
 
b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
new file mode 100644
index 0000000..bd0a606
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.exceptions;
+
+/**
+ * This exception is thrown when the lock based voting is failed.
+ */
+public class VoteFailedException extends Exception {
+  /**
+   * Constructs a new VoteFailedException with the specified detail message.
+   *
+   * @param message the detail message.
+   */
+  public VoteFailedException(String message) {
+    super(message);
+  }
+
+  /**
+   * Constructs a new VoteFailedException with the specified detail message and
+   * cause.
+   *
+   * @param message the detail message
+   * @param cause   the cause for this exception
+   */
+  public VoteFailedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java
new file mode 100644
index 0000000..a18f3c9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import org.apache.gossip.crdt.Crdt;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * CRDT which used for distribute a votes for a given key.
+ */
+public class MajorityVote implements Crdt<Map<String, VoteCandidate>, 
MajorityVote> {
+
+  private final Map<String, VoteCandidate> voteCandidates = new 
ConcurrentHashMap<>();
+
+  public MajorityVote(Map<String, VoteCandidate> voteCandidateMap) {
+    voteCandidates.putAll(voteCandidateMap);
+  }
+
+  @Override
+  public MajorityVote merge(MajorityVote other) {
+    Map<String, VoteCandidate> mergedCandidates = new ConcurrentHashMap<>();
+    Set<String> firstKeySet = this.voteCandidates.keySet();
+    Set<String> secondKeySet = other.voteCandidates.keySet();
+    Set<String> sameCandidatesSet = getIntersection(firstKeySet, secondKeySet);
+    Set<String> differentCandidatesSet = 
getIntersectionCompliment(firstKeySet, secondKeySet);
+    // Merge different vote candidates by combining votes
+    for (String differentCandidateId : differentCandidatesSet) {
+      if (this.voteCandidates.containsKey(differentCandidateId)) {
+        mergedCandidates.put(differentCandidateId, 
this.voteCandidates.get(differentCandidateId));
+      } else if (other.voteCandidates.containsKey(differentCandidateId)) {
+        mergedCandidates.put(differentCandidateId, 
other.voteCandidates.get(differentCandidateId));
+      }
+    }
+    // Merge votes for the same candidate
+    for (String sameCandidateId : sameCandidatesSet) {
+      if (this.voteCandidates.containsKey(sameCandidateId) && 
other.voteCandidates
+              .containsKey(sameCandidateId)) {
+        mergedCandidates.put(sameCandidateId,
+                mergeCandidate(this.voteCandidates.get(sameCandidateId),
+                        other.voteCandidates.get(sameCandidateId)));
+      }
+    }
+
+    return new MajorityVote(mergedCandidates);
+  }
+
+  // Merge different votes for same candidate
+  private VoteCandidate mergeCandidate(VoteCandidate firstCandidate,
+          VoteCandidate secondCandidate) {
+    VoteCandidate mergeResult = new 
VoteCandidate(firstCandidate.getCandidateNodeId(),
+            firstCandidate.getVotingKey(), new ConcurrentHashMap<>());
+    Set<String> firstKeySet = firstCandidate.getVotes().keySet();
+    Set<String> secondKeySet = secondCandidate.getVotes().keySet();
+    Set<String> sameVoteNodeSet = getIntersection(firstKeySet, secondKeySet);
+    Set<String> differentVoteNodeSet = getIntersectionCompliment(firstKeySet, 
secondKeySet);
+    // Merge different voters by combining their votes
+    for (String differentCandidateId : differentVoteNodeSet) {
+      if (firstCandidate.getVotes().containsKey(differentCandidateId)) {
+        mergeResult.getVotes()
+                .put(differentCandidateId, 
firstCandidate.getVotes().get(differentCandidateId));
+      } else if (secondCandidate.getVotes().containsKey(differentCandidateId)) 
{
+        mergeResult.getVotes()
+                .put(differentCandidateId, 
secondCandidate.getVotes().get(differentCandidateId));
+      }
+    }
+    // Merge vote for same voter
+    for (String sameVoteNodeId : sameVoteNodeSet) {
+      if (firstCandidate.getVotes().containsKey(sameVoteNodeId) && 
secondCandidate.getVotes()
+              .containsKey(sameVoteNodeId)) {
+        mergeResult.getVotes().put(sameVoteNodeId,
+                mergeVote(firstCandidate.getVotes().get(sameVoteNodeId),
+                        secondCandidate.getVotes().get(sameVoteNodeId)));
+      }
+    }
+
+    return mergeResult;
+  }
+
+  // Merge two votes from same voter
+  private Vote mergeVote(Vote firstVote, Vote secondVote) {
+    if (firstVote.getVoteValue().booleanValue() != 
secondVote.getVoteValue().booleanValue()) {
+      if (firstVote.getVoteExchange()) {
+        return firstVote;
+      } else if (secondVote.getVoteExchange()) {
+        return secondVote;
+      } else {
+        return secondVote;
+      }
+    } else {
+      return secondVote;
+    }
+  }
+
+  private Set<String> getIntersection(Set<String> first, Set<String> second) {
+    Set<String> intersection = new HashSet<>(first);
+    intersection.retainAll(second);
+    return intersection;
+  }
+
+  private Set<String> getIntersectionCompliment(Set<String> first, Set<String> 
second) {
+    Set<String> union = new HashSet<>();
+    union.addAll(first);
+    union.addAll(second);
+    Set<String> intersectionCompliment = new HashSet<>(union);
+    intersectionCompliment.removeAll(getIntersection(first, second));
+    return intersectionCompliment;
+  }
+
+  @Override
+  public Map<String, VoteCandidate> value() {
+    Map<String, VoteCandidate> copy = new ConcurrentHashMap<>();
+    copy.putAll(voteCandidates);
+    return Collections.unmodifiableMap(copy);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return voteCandidates.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null)
+      return false;
+    if (obj == this)
+      return true;
+    if (!(obj instanceof MajorityVote))
+      return false;
+    MajorityVote other = (MajorityVote) obj;
+    return Objects.equals(voteCandidates, other.voteCandidates);
+  }
+
+  @Override
+  public String toString() {
+    return voteCandidates.toString();
+  }
+
+  @Override
+  public MajorityVote optimize() {
+    return new MajorityVote(voteCandidates);
+  }
+
+  public Map<String, VoteCandidate> getVoteCandidates() {
+    return new ConcurrentHashMap<>(voteCandidates);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java
new file mode 100644
index 0000000..d07f190
--- /dev/null
+++ 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * VoteSelector implementation which randomly select a voting node.
+ */
+public class RandomVoteSelector implements VoteSelector {
+
+  @Override
+  public String getVoteCandidateId(Set<String> voteCandidateIds) {
+    List<String> voteCandidatesIds = new ArrayList<>(voteCandidateIds);
+    return voteCandidatesIds.get(new 
Random().nextInt(voteCandidatesIds.size()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java
new file mode 100644
index 0000000..e68401c
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.List;
+
+/**
+ * Store a voter details.
+ */
+public class Vote {
+  private final String votingNode;
+  private final Boolean voteValue; // TODO: 7/16/17  weight?
+  private Boolean voteExchange;
+  private final List<String> liveMembers;
+  private final List<String> deadMembers;
+
+  public Vote(String votingNode, Boolean voteValue, Boolean voteExchange, 
List<String> liveMembers,
+          List<String> deadMembers) {
+    this.votingNode = votingNode;
+    this.voteValue = voteValue;
+    this.voteExchange = voteExchange;
+    this.liveMembers = liveMembers;
+    this.deadMembers = deadMembers;
+  }
+
+  public String getVotingNode() {
+    return votingNode;
+  }
+
+  public Boolean getVoteValue() {
+    return voteValue;
+  }
+
+  public Boolean getVoteExchange() {
+    return voteExchange;
+  }
+
+  public void setVoteExchange(Boolean voteExchange) {
+    this.voteExchange = voteExchange;
+  }
+
+  public List<String> getLiveMembers() {
+    return liveMembers;
+  }
+
+  public List<String> getDeadMembers() {
+    return deadMembers;
+  }
+
+  @Override
+  public String toString() {
+    return "votingNode=" + votingNode + ", voteValue=" + voteValue + ", 
liveMembers=" + liveMembers
+            + ", deadMembers= " + deadMembers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java
new file mode 100644
index 0000000..b81b0b9
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Stores the vote candidate details and its votes.
+ */
+public class VoteCandidate {
+
+  private final String candidateNodeId;
+  private final String votingKey;
+  private final Map<String, Vote> votes;
+
+  public VoteCandidate(String candidateNodeId, String votingKey, Map<String, 
Vote> votes) {
+
+    this.candidateNodeId = candidateNodeId;
+    this.votingKey = votingKey;
+    this.votes = votes;
+  }
+
+  public String getCandidateNodeId() {
+    return candidateNodeId;
+  }
+
+  public String getVotingKey() {
+    return votingKey;
+  }
+
+  public Map<String, Vote> getVotes() {
+    return votes;
+  }
+
+  public void addVote(Vote vote) {
+    votes.put(vote.getVotingNode(), vote);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(candidateNodeId, votingKey);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof VoteCandidate))
+      return false;
+    if (obj == this)
+      return true;
+    VoteCandidate other = (VoteCandidate) obj;
+    return this.candidateNodeId.equals(other.candidateNodeId) && this.votingKey
+            .equals(other.votingKey);
+  }
+
+  @Override
+  public String toString() {
+    return "candidateNodeId=" + candidateNodeId + ", votingKey=" + votingKey + 
", votes= " + votes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java 
b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java
new file mode 100644
index 0000000..91c22b3
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import java.util.Set;
+
+/**
+ * This interface defines vote selection algorithm for the vote based locking.
+ */
+public interface VoteSelector {
+  /**
+   * This method get call by the lock manager of a node to decide which 
candidate need to be choose for voting.
+   *
+   * @param voteCandidateIds node id set for the vote candidates
+   * @return selected node id to vote from the given vote candidate set.
+   */
+  String getVoteCandidateId(Set<String> voteCandidateIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java 
b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index db442c6..2e45843 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -28,6 +28,8 @@ import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
 import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
+import org.apache.gossip.lock.LockManager;
+import org.apache.gossip.lock.exceptions.VoteFailedException;
 import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
@@ -43,7 +45,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -77,7 +83,8 @@ public abstract class GossipManager {
   private final GossipMemberStateRefresher memberStateRefresher;
   
   private final MessageHandler messageHandler;
-  
+  private final LockManager lockManager;
+
   public GossipManager(String cluster,
                        URI uri, String id, Map<String, String> properties, 
GossipSettings settings,
                        List<Member> gossipMembers, GossipListener listener, 
MetricRegistry registry,
@@ -89,6 +96,7 @@ public abstract class GossipManager {
     me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
             settings.getWindowSize(), settings.getMinimumSamples(), 
settings.getDistribution());
     gossipCore = new GossipCore(this, registry);
+    this.lockManager = new LockManager(this, 
settings.getLockManagerSettings(), registry);
     dataReaper = new DataReaper(gossipCore, clock);
     members = new ConcurrentSkipListMap<>();
     for (Member startupMember : gossipMembers) {
@@ -221,6 +229,7 @@ public abstract class GossipManager {
    */
   public void shutdown() {
     gossipServiceRunning.set(false);
+    lockManager.shutdown();
     gossipCore.shutdown();
     transportManager.shutdown();
     dataReaper.close();
@@ -371,4 +380,21 @@ public abstract class GossipManager {
   public void registerGossipListener(GossipListener listener) {
     memberStateRefresher.register(listener);
   }
+
+  /**
+   * Get the lock manager specified with this GossipManager.
+   * @return lock manager object.
+   */
+  public LockManager getLockManager() {
+    return lockManager;
+  }
+
+  /**
+   * Try to acquire a lock on given shared data key.
+   * @param key key of tha share data object.
+   * @throws VoteFailedException if the locking is failed.
+   */
+  public void acquireSharedDataLock(String key) throws VoteFailedException{
+    lockManager.acquireSharedDataLock(key);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java 
b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java
new file mode 100644
index 0000000..c558444
--- /dev/null
+++ 
b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock.vote;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(JUnitPlatform.class)
+public class MajorityVoteTest {
+
+  @Test
+  public void differentCandidateMergeTest() {
+    Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+    VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 
2, true, true));
+    voteCandidateMap1.put("1", candidateA);
+    MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+    Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+    VoteCandidate candidateB = new VoteCandidate("3", "key1", generateVotes(3, 
4, true, false));
+    voteCandidateMap2.put("3", candidateB);
+    MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+    MajorityVote result = first.merge(second);
+
+    
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+    
Assert.assertTrue(!result.value().get("3").getVotes().get("4").getVoteValue());
+
+  }
+
+  @Test
+  public void sameCandidateMergeTest() {
+    Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+    VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 
2, true, true));
+    voteCandidateMap1.put("1", candidateA);
+    MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+    Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+    VoteCandidate candidateB = new VoteCandidate("1", "key1", generateVotes(3, 
4, true, false));
+    voteCandidateMap2.put("1", candidateB);
+    MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+    MajorityVote result = first.merge(second);
+
+    
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+    
Assert.assertTrue(!result.value().get("1").getVotes().get("4").getVoteValue());
+
+  }
+
+  @Test
+  public void sameVoteMergeTest() {
+    Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
+    VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 
2, true, true));
+    voteCandidateMap1.put("1", candidateA);
+    MajorityVote first = new MajorityVote(voteCandidateMap1);
+
+    Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
+    VoteCandidate candidateB = new VoteCandidate("1", "key1",
+            generateVotes(2, 4, true, false, true));
+    voteCandidateMap2.put("1", candidateB);
+    MajorityVote second = new MajorityVote(voteCandidateMap2);
+
+    MajorityVote result = first.merge(second);
+
+    
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
+  }
+
+  public Map<String, Vote> generateVotes(int startingNodeId, int endNodeId, 
boolean... votes) {
+    Map<String, Vote> voteMap = new HashMap<>();
+    if ((endNodeId - startingNodeId + 1) > votes.length) {
+      return voteMap;
+    }
+    for (int i = startingNodeId; i <= endNodeId; i++) {
+      String nodeId = i + "";
+      voteMap.put(nodeId, new Vote(nodeId, votes[i - startingNodeId], false, 
new ArrayList<>(),
+              new ArrayList<>()));
+    }
+    return voteMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java
----------------------------------------------------------------------
diff --git 
a/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java 
b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java
new file mode 100644
index 0000000..01dbac0
--- /dev/null
+++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip;
+
+import org.apache.gossip.lock.exceptions.VoteFailedException;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(JUnitPlatform.class)
+public class SharedDataLockTest extends AbstractIntegrationBase {
+
+  @Test
+  public void sharedDataLockRandomVoteTest()
+          throws InterruptedException, UnknownHostException, 
URISyntaxException {
+    GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
+    String cluster = UUID.randomUUID().toString();
+    int seedNodes = 1;
+    List<Member> startupMembers = new ArrayList<>();
+    for (int i = 1; i < seedNodes + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      startupMembers.add(new RemoteMember(cluster, uri, i + ""));
+    }
+    final List<GossipManager> clients = new ArrayList<>();
+    final int clusterMembers = 10;
+    for (int i = 1; i < clusterMembers + 1; ++i) {
+      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+      GossipManager gossipService = 
GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
+              .id(i + 
"").gossipMembers(startupMembers).gossipSettings(settings).build();
+      clients.add(gossipService);
+      gossipService.getLockManager().setNumberOfNodes(clusterMembers);
+      gossipService.init();
+      register(gossipService);
+    }
+
+    // Adding new data to Node 1
+    clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
+
+    final AtomicInteger lockSuccessCount = new AtomicInteger(0);
+    final AtomicInteger lockFailedCount = new AtomicInteger(0);
+
+    // Node 1 try to lock on key category
+    Thread Node1LockingThread = new Thread(() -> {
+      try {
+        clients.get(0).acquireSharedDataLock("category");
+        lockSuccessCount.incrementAndGet();
+      } catch (VoteFailedException ignore) {
+        lockFailedCount.incrementAndGet();
+      }
+    });
+
+    // Node 3 try to lock on key category
+    Thread Node3LockingThread = new Thread(() -> {
+      try {
+        clients.get(2).acquireSharedDataLock("category");
+        lockSuccessCount.incrementAndGet();
+      } catch (VoteFailedException ignore) {
+        lockFailedCount.incrementAndGet();
+      }
+    });
+
+    // Node 6 try to lock on key category
+    Thread Node5LockingThread = new Thread(() -> {
+      try {
+        clients.get(5).acquireSharedDataLock("category");
+        lockSuccessCount.incrementAndGet();
+      } catch (VoteFailedException ignore) {
+        lockFailedCount.incrementAndGet();
+      }
+    });
+
+    Node1LockingThread.start();
+    Node3LockingThread.start();
+    Node5LockingThread.start();
+
+    Node1LockingThread.join();
+    Node3LockingThread.join();
+    Node5LockingThread.join();
+
+    // Only one node should acquire the lock
+    Assert.assertEquals(1, lockSuccessCount.get());
+    // Other nodes should fail
+    Assert.assertEquals(2, lockFailedCount.get());
+
+  }
+
+  private SharedDataMessage sharedNodeData(String key, String value) {
+    SharedDataMessage g = new SharedDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey(key);
+    g.setPayload(value);
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+
+}

Reply via email to