Repository: incubator-gossip Updated Branches: refs/heads/master ac8303893 -> 5ed3ed85c
GOSSIP-75 Vote based locking Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/5ed3ed85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/5ed3ed85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/5ed3ed85 Branch: refs/heads/master Commit: 5ed3ed85cf781f02b8e487ac396980c9909e4146 Parents: ac83038 Author: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Authored: Fri Jul 14 22:19:16 2017 +0530 Committer: Mirage Abeysekara <mirage...@cse.mrt.ac.lk> Committed: Wed Aug 23 23:31:39 2017 +0530 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipSettings.java | 18 +- .../java/org/apache/gossip/crdt/CrdtModule.java | 31 ++ .../org/apache/gossip/lock/LockManager.java | 318 +++++++++++++++++++ .../apache/gossip/lock/LockManagerSettings.java | 83 +++++ .../lock/exceptions/VoteFailedException.java | 43 +++ .../apache/gossip/lock/vote/MajorityVote.java | 169 ++++++++++ .../gossip/lock/vote/RandomVoteSelector.java | 35 ++ .../java/org/apache/gossip/lock/vote/Vote.java | 70 ++++ .../apache/gossip/lock/vote/VoteCandidate.java | 75 +++++ .../apache/gossip/lock/vote/VoteSelector.java | 33 ++ .../apache/gossip/manager/GossipManager.java | 30 +- .../gossip/lock/vote/MajorityVoteTest.java | 101 ++++++ .../org/apache/gossip/SharedDataLockTest.java | 125 ++++++++ 13 files changed, 1128 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 32c00c9..4ea0ab6 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -17,6 +17,8 @@ */ package org.apache.gossip; +import org.apache.gossip.lock.LockManagerSettings; + import java.util.HashMap; import java.util.Map; @@ -61,7 +63,10 @@ public class GossipSettings { private String pathToKeyStore = "./keys"; private boolean signMessages = false; - + + // Settings related to lock manager + private LockManagerSettings lockManagerSettings = LockManagerSettings + .getLockManagerDefaultSettings(); /** * Construct GossipSettings with default settings. @@ -242,4 +247,15 @@ public class GossipSettings { this.protocolManagerClass = protocolManagerClass; } + public LockManagerSettings getLockManagerSettings() { + return lockManagerSettings; + } + + /** + * Set the lock settings use by the lock manager + * @param lockManagerSettings lock settings. This object cannot be null. + */ + public void setLockManagerSettings(LockManagerSettings lockManagerSettings) { + this.lockManagerSettings = lockManagerSettings; + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index 396ec03..83d573d 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.gossip.LocalMember; +import org.apache.gossip.lock.vote.MajorityVote; +import org.apache.gossip.lock.vote.Vote; +import org.apache.gossip.lock.vote.VoteCandidate; import org.apache.gossip.replication.BlackListReplicable; import org.apache.gossip.replication.Replicable; import org.apache.gossip.replication.WhiteListReplicable; @@ -107,6 +110,31 @@ abstract class BlackListReplicableMixin { @JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers(); } +abstract class VoteCandidateMixin { + @JsonCreator + VoteCandidateMixin( + @JsonProperty("candidateNodeId") String candidateNodeId, + @JsonProperty("votingKey") String votingKey, + @JsonProperty("votes") Map<String, Vote> votes + ) { } +} + +abstract class VoteMixin { + @JsonCreator + VoteMixin( + @JsonProperty("votingNode") String votingNode, + @JsonProperty("voteValue") Boolean voteValue, + @JsonProperty("voteExchange") Boolean voteExchange, + @JsonProperty("liveMembers") List<String> liveMembers, + @JsonProperty("deadMembers") List<String> deadMembers + ) { } +} + +abstract class MajorityVoteMixin<E>{ + @JsonCreator + MajorityVoteMixin(@JsonProperty("voteCandidates") Map<String, VoteCandidate> voteCandidateMap){ } +} + //If anyone wants to take a stab at this. please have at it //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java public class CrdtModule extends SimpleModule { @@ -130,6 +158,9 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(Replicable.class, ReplicableMixin.class); context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class); context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class); + context.setMixInAnnotations(MajorityVote.class, MajorityVoteMixin.class); + context.setMixInAnnotations(VoteCandidate.class, VoteCandidateMixin.class); + context.setMixInAnnotations(Vote.class, VoteMixin.class); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java new file mode 100644 index 0000000..9f4636a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.apache.gossip.Member; +import org.apache.gossip.lock.exceptions.VoteFailedException; +import org.apache.gossip.lock.vote.MajorityVote; +import org.apache.gossip.lock.vote.Vote; +import org.apache.gossip.lock.vote.VoteCandidate; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.log4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class LockManager { + + public static final Logger LOGGER = Logger.getLogger(LockManager.class); + + private final GossipManager gossipManager; + private final LockManagerSettings lockSettings; + private final ScheduledExecutorService voteService; + private final AtomicInteger numberOfNodes; + private final Set<String> lockKeys; + // For MetricRegistry + public static final String LOCK_KEY_SET_SIZE = "gossip.lock.key_set_size"; + public static final String LOCK_TIME = "gossip.lock.time"; + private final Timer lockTimeMetric; + + public LockManager(GossipManager gossipManager, final LockManagerSettings lockManagerSettings, + MetricRegistry metrics) { + this.gossipManager = gossipManager; + this.lockSettings = lockManagerSettings; + this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes()); + this.lockKeys = new CopyOnWriteArraySet<>(); + metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size); + lockTimeMetric = metrics.timer(LOCK_TIME); + // Register listener for lock keys + gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> { + if (key.contains("lock/")) { + lockKeys.add(key); + } + }); + voteService = Executors.newScheduledThreadPool(2); + voteService.scheduleAtFixedRate(this::updateVotes, 0, lockSettings.getVoteUpdateInterval(), + TimeUnit.MILLISECONDS); + } + + public void acquireSharedDataLock(String key) throws VoteFailedException { + final Timer.Context context = lockTimeMetric.time(); + gossipManager.merge(generateLockMessage(key)); + int deadlockDetectCount = 0; + while (true) { + SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key)); + if (message == null || !(message.getPayload() instanceof MajorityVote)) { + continue; + } + MajorityVote majorityVoteResult = (MajorityVote) message.getPayload(); + final Map<String, VoteCandidate> voteCandidatesMap = majorityVoteResult.value(); + final Map<String, Boolean> voteResultMap = new HashMap<>(); + // Store the vote result for each vote candidate nodes + voteCandidatesMap.forEach((candidateId, voteCandidate) -> voteResultMap + .put(candidateId, isVoteSuccess(voteCandidate))); + + long passedCandidates = voteResultMap.values().stream().filter(aBoolean -> aBoolean).count(); + String myNodeId = gossipManager.getMyself().getId(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("NodeId=" + myNodeId + ", VoteMap=" + voteResultMap + ", WinnerCount=" + + passedCandidates); + } + // Check for possible dead lock when no candidates were won + if (passedCandidates == 0) { + if (isDeadLock(voteCandidatesMap)) { + deadlockDetectCount++; + // Testing for deadlock is not always correct, therefore test for continues deadlocks + if (deadlockDetectCount >= lockSettings.getDeadlockDetectionThreshold()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Deadlock detected from node " + myNodeId + ". VoteCandidatesMap=" + + voteCandidatesMap); + } + preventDeadLock(voteCandidatesMap); + } + } else { + deadlockDetectCount = 0; + } + } else if (passedCandidates == 1 && voteResultMap.containsKey(myNodeId)) { + context.stop(); + if (voteResultMap.get(myNodeId)) { + // There is one winner and that is my node, therefore break the while loop and continue + break; + } else { + throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key); + } + } else if (passedCandidates > 1) { + // Multiple winners are not possible + context.stop(); + throw new IllegalStateException("Multiple nodes get voted."); + } + + try { + Thread.sleep(lockSettings.getResultCalculationDelay()); + } catch (InterruptedException e) { + throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key, e); + } + } + } + + // Generate Crdt lock message for voting + private SharedDataMessage generateLockMessage(String key) { + VoteCandidate voteCandidate = new VoteCandidate(gossipManager.getMyself().getId(), key, + new ConcurrentHashMap<>()); + voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true, false, + gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()), + gossipManager.getDeadMembers().stream().map(Member::getId) + .collect(Collectors.toList()))); + Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>(); + voteCandidateMap.put(voteCandidate.getCandidateNodeId(), voteCandidate); + MajorityVote majorityVote = new MajorityVote(voteCandidateMap); + SharedDataMessage lockMessage = new SharedDataMessage(); + lockMessage.setKey(generateLockKey(key)); + lockMessage.setPayload(majorityVote); + lockMessage.setExpireAt(Long.MAX_VALUE); + lockMessage.setTimestamp(System.currentTimeMillis()); + return lockMessage; + } + + // This method will run periodically to vote the other nodes + private void updateVotes() { + for (String lockKey : lockKeys) { + SharedDataMessage message = gossipManager.findSharedGossipData(lockKey); + if (message == null || !(message.getPayload() instanceof MajorityVote)) { + continue; + } + MajorityVote majorityVote = (MajorityVote) message.getPayload(); + Map<String, VoteCandidate> voteCandidateMap = majorityVote.value(); + String myNodeId = gossipManager.getMyself().getId(); + // No need to vote if my node is already voted to every node for the key + if (isVotedToAll(myNodeId, voteCandidateMap)) { + continue; + } + String myVoteCandidate = getVotedCandidateNodeId(myNodeId, voteCandidateMap); + + if (myVoteCandidate == null) { + myVoteCandidate = lockSettings.getVoteSelector().getVoteCandidateId(voteCandidateMap.keySet()); + } + for (VoteCandidate voteCandidate : voteCandidateMap.values()) { + if (voteCandidate.getCandidateNodeId().equals(myNodeId)) { + continue; + } + // Vote for selected candidate + boolean voteResult = voteCandidate.getCandidateNodeId().equals(myVoteCandidate); + voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), voteResult, false, + gossipManager.getLiveMembers().stream().map(Member::getId) + .collect(Collectors.toList()), + gossipManager.getDeadMembers().stream().map(Member::getId) + .collect(Collectors.toList()))); + } + } + } + + // Return true if every node has a vote from given node id. + private boolean isVotedToAll(String nodeId, final Map<String, VoteCandidate> voteCandidates) { + int voteCount = 0; + for (VoteCandidate voteCandidate : voteCandidates.values()) { + if (voteCandidate.getVotes().containsKey(nodeId)) { + voteCount++; + } + } + return voteCount == voteCandidates.size(); + } + + // Returns true if there is a deadlock for given vote candidates + private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates) { + boolean result = true; + int numberOfLiveNodes; + if (numberOfNodes.get() > 0) { + numberOfLiveNodes = numberOfNodes.get(); + } else { + // numberOfNodes is not set by the user, therefore calculate it. + Set<String> liveNodes = voteCandidates.values().stream() + .map(voteCandidate -> voteCandidate.getVotes().values()).flatMap(Collection::stream) + .map(Vote::getLiveMembers).flatMap(List::stream).collect(Collectors.toSet()); + numberOfLiveNodes = liveNodes.size(); + } + for (VoteCandidate voteCandidate : voteCandidates.values()) { + result = result && voteCandidate.getVotes().size() == numberOfLiveNodes; + } + return result; + } + + // Prevent the deadlock by giving up the votes + private void preventDeadLock(Map<String, VoteCandidate> voteCandidates) { + String myNodeId = gossipManager.getMyself().getId(); + VoteCandidate myResults = voteCandidates.get(myNodeId); + if (myResults == null) { + return; + } + // Set of nodes that is going to receive this nodes votes + List<String> donateCandidateIds = voteCandidates.keySet().stream() + .filter(s -> s.compareTo(myNodeId) < 0).collect(Collectors.toList()); + if (donateCandidateIds.size() == 0) { + return; + } + // Select a random node to donate + Random randomizer = new Random(); + String selectedCandidateId = donateCandidateIds + .get(randomizer.nextInt(donateCandidateIds.size())); + VoteCandidate selectedCandidate = voteCandidates.get(selectedCandidateId); + + Set<Vote> myVotes = new HashSet<>(myResults.getVotes().values()); + Set<Vote> selectedCandidateVotes = new HashSet<>(selectedCandidate.getVotes().values()); + // Exchange the votes + for (Vote myVote : myVotes) { + for (Vote candidateVote : selectedCandidateVotes) { + if (myVote.getVoteValue() && myVote.getVotingNode().equals(candidateVote.getVotingNode())) { + myVote.setVoteExchange(true); + candidateVote.setVoteExchange(true); + selectedCandidate.getVotes().put(myVote.getVotingNode(), myVote); + myResults.getVotes().put(candidateVote.getVotingNode(), candidateVote); + } + } + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Node " + myNodeId + " give up votes to node " + selectedCandidateId); + } + } + + private String getVotedCandidateNodeId(String nodeId, + final Map<String, VoteCandidate> voteCandidates) { + for (VoteCandidate voteCandidate : voteCandidates.values()) { + Vote vote = voteCandidate.getVotes().get(nodeId); + if (vote != null && vote.getVoteValue()) { + return voteCandidate.getCandidateNodeId(); + } + } + return null; + } + + // Return true if the given candidate has passed the vote + private boolean isVoteSuccess(VoteCandidate voteCandidate) { + Set<String> liveNodes = new HashSet<>(); + int voteCount = 0; + for (Vote vote : voteCandidate.getVotes().values()) { + liveNodes.addAll(vote.getLiveMembers()); + if (vote.getVoteValue()) { + voteCount++; + } + } + int numberOfLiveNodes; + if (numberOfNodes.get() > 0) { + numberOfLiveNodes = numberOfNodes.get(); + } else { + numberOfLiveNodes = liveNodes.size(); + } + return numberOfLiveNodes > 0 && voteCount >= (numberOfLiveNodes / 2 + 1); + } + + private String generateLockKey(String key){ + return "lock/" + key; + } + + public void shutdown(){ + voteService.shutdown(); + } + /** + * Get the voted node id from this node for a given key + * @param key key of the data object + * @return Voted node id + */ + public String getVotedCandidateNodeId(String key) { + SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key)); + if (message == null || !(message.getPayload() instanceof MajorityVote)) { + return null; + } + MajorityVote majorityVote = (MajorityVote) message.getPayload(); + return getVotedCandidateNodeId(gossipManager.getMyself().getId(), majorityVote.value()); + } + + /** + * Set the number of live nodes. If this value is negative, live nodes will be calculated + * @param numberOfNodes live node count or negative to calculate. + */ + public void setNumberOfNodes(int numberOfNodes) { + this.numberOfNodes.set(numberOfNodes); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java new file mode 100644 index 0000000..4af47a2 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/LockManagerSettings.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock; + +import org.apache.gossip.lock.vote.RandomVoteSelector; +import org.apache.gossip.lock.vote.VoteSelector; + +/** + * Stores the lock manager related settings. + */ +public class LockManagerSettings { + // Time between vote updates in ms. Default is 1 second. + private final int voteUpdateInterval; + // Vote selection algorithm. Default is random voting + private final VoteSelector voteSelector; + // Number of nodes available for voting. Default is -1 (Auto calculate) + private final int numberOfNodes; + // Number of times to test for deadlock before preventing. Default is 3 + private final int deadlockDetectionThreshold; + // Wait time between vote result calculation. Default is 1000 + private final int resultCalculationDelay; + + /** + * Construct LockManagerSettings with default settings. + */ + public static LockManagerSettings getLockManagerDefaultSettings() { + return new LockManagerSettings(1000, new RandomVoteSelector(), -1, 3, 1000); + } + + /** + * Construct a custom LockManagerSettings + * + * @param voteUpdateInterval Time between vote updates in milliseconds. + * @param voteSelector Vote selection algorithm. Cannot be null + * @param numberOfNodes Number of nodes available for voting. Set to negative value for auto calculate + * @param deadlockDetectionThreshold Number of times to test for deadlock before preventing + * @param resultCalculationDelay Wait time between vote result calculation + */ + public LockManagerSettings(int voteUpdateInterval, VoteSelector voteSelector, int numberOfNodes, + int deadlockDetectionThreshold, int resultCalculationDelay) { + this.voteUpdateInterval = voteUpdateInterval; + this.voteSelector = voteSelector; + this.numberOfNodes = numberOfNodes; + this.deadlockDetectionThreshold = deadlockDetectionThreshold; + this.resultCalculationDelay = resultCalculationDelay; + + } + + public int getVoteUpdateInterval() { + return voteUpdateInterval; + } + + public VoteSelector getVoteSelector() { + return voteSelector; + } + + public int getNumberOfNodes() { + return numberOfNodes; + } + + public int getDeadlockDetectionThreshold() { + return deadlockDetectionThreshold; + } + + public int getResultCalculationDelay() { + return resultCalculationDelay; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java new file mode 100644 index 0000000..bd0a606 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/exceptions/VoteFailedException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.exceptions; + +/** + * This exception is thrown when the lock based voting is failed. + */ +public class VoteFailedException extends Exception { + /** + * Constructs a new VoteFailedException with the specified detail message. + * + * @param message the detail message. + */ + public VoteFailedException(String message) { + super(message); + } + + /** + * Constructs a new VoteFailedException with the specified detail message and + * cause. + * + * @param message the detail message + * @param cause the cause for this exception + */ + public VoteFailedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java new file mode 100644 index 0000000..a18f3c9 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/MajorityVote.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import org.apache.gossip.crdt.Crdt; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * CRDT which used for distribute a votes for a given key. + */ +public class MajorityVote implements Crdt<Map<String, VoteCandidate>, MajorityVote> { + + private final Map<String, VoteCandidate> voteCandidates = new ConcurrentHashMap<>(); + + public MajorityVote(Map<String, VoteCandidate> voteCandidateMap) { + voteCandidates.putAll(voteCandidateMap); + } + + @Override + public MajorityVote merge(MajorityVote other) { + Map<String, VoteCandidate> mergedCandidates = new ConcurrentHashMap<>(); + Set<String> firstKeySet = this.voteCandidates.keySet(); + Set<String> secondKeySet = other.voteCandidates.keySet(); + Set<String> sameCandidatesSet = getIntersection(firstKeySet, secondKeySet); + Set<String> differentCandidatesSet = getIntersectionCompliment(firstKeySet, secondKeySet); + // Merge different vote candidates by combining votes + for (String differentCandidateId : differentCandidatesSet) { + if (this.voteCandidates.containsKey(differentCandidateId)) { + mergedCandidates.put(differentCandidateId, this.voteCandidates.get(differentCandidateId)); + } else if (other.voteCandidates.containsKey(differentCandidateId)) { + mergedCandidates.put(differentCandidateId, other.voteCandidates.get(differentCandidateId)); + } + } + // Merge votes for the same candidate + for (String sameCandidateId : sameCandidatesSet) { + if (this.voteCandidates.containsKey(sameCandidateId) && other.voteCandidates + .containsKey(sameCandidateId)) { + mergedCandidates.put(sameCandidateId, + mergeCandidate(this.voteCandidates.get(sameCandidateId), + other.voteCandidates.get(sameCandidateId))); + } + } + + return new MajorityVote(mergedCandidates); + } + + // Merge different votes for same candidate + private VoteCandidate mergeCandidate(VoteCandidate firstCandidate, + VoteCandidate secondCandidate) { + VoteCandidate mergeResult = new VoteCandidate(firstCandidate.getCandidateNodeId(), + firstCandidate.getVotingKey(), new ConcurrentHashMap<>()); + Set<String> firstKeySet = firstCandidate.getVotes().keySet(); + Set<String> secondKeySet = secondCandidate.getVotes().keySet(); + Set<String> sameVoteNodeSet = getIntersection(firstKeySet, secondKeySet); + Set<String> differentVoteNodeSet = getIntersectionCompliment(firstKeySet, secondKeySet); + // Merge different voters by combining their votes + for (String differentCandidateId : differentVoteNodeSet) { + if (firstCandidate.getVotes().containsKey(differentCandidateId)) { + mergeResult.getVotes() + .put(differentCandidateId, firstCandidate.getVotes().get(differentCandidateId)); + } else if (secondCandidate.getVotes().containsKey(differentCandidateId)) { + mergeResult.getVotes() + .put(differentCandidateId, secondCandidate.getVotes().get(differentCandidateId)); + } + } + // Merge vote for same voter + for (String sameVoteNodeId : sameVoteNodeSet) { + if (firstCandidate.getVotes().containsKey(sameVoteNodeId) && secondCandidate.getVotes() + .containsKey(sameVoteNodeId)) { + mergeResult.getVotes().put(sameVoteNodeId, + mergeVote(firstCandidate.getVotes().get(sameVoteNodeId), + secondCandidate.getVotes().get(sameVoteNodeId))); + } + } + + return mergeResult; + } + + // Merge two votes from same voter + private Vote mergeVote(Vote firstVote, Vote secondVote) { + if (firstVote.getVoteValue().booleanValue() != secondVote.getVoteValue().booleanValue()) { + if (firstVote.getVoteExchange()) { + return firstVote; + } else if (secondVote.getVoteExchange()) { + return secondVote; + } else { + return secondVote; + } + } else { + return secondVote; + } + } + + private Set<String> getIntersection(Set<String> first, Set<String> second) { + Set<String> intersection = new HashSet<>(first); + intersection.retainAll(second); + return intersection; + } + + private Set<String> getIntersectionCompliment(Set<String> first, Set<String> second) { + Set<String> union = new HashSet<>(); + union.addAll(first); + union.addAll(second); + Set<String> intersectionCompliment = new HashSet<>(union); + intersectionCompliment.removeAll(getIntersection(first, second)); + return intersectionCompliment; + } + + @Override + public Map<String, VoteCandidate> value() { + Map<String, VoteCandidate> copy = new ConcurrentHashMap<>(); + copy.putAll(voteCandidates); + return Collections.unmodifiableMap(copy); + + } + + @Override + public int hashCode() { + return voteCandidates.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (obj == this) + return true; + if (!(obj instanceof MajorityVote)) + return false; + MajorityVote other = (MajorityVote) obj; + return Objects.equals(voteCandidates, other.voteCandidates); + } + + @Override + public String toString() { + return voteCandidates.toString(); + } + + @Override + public MajorityVote optimize() { + return new MajorityVote(voteCandidates); + } + + public Map<String, VoteCandidate> getVoteCandidates() { + return new ConcurrentHashMap<>(voteCandidates); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java new file mode 100644 index 0000000..d07f190 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/RandomVoteSelector.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * VoteSelector implementation which randomly select a voting node. + */ +public class RandomVoteSelector implements VoteSelector { + + @Override + public String getVoteCandidateId(Set<String> voteCandidateIds) { + List<String> voteCandidatesIds = new ArrayList<>(voteCandidateIds); + return voteCandidatesIds.get(new Random().nextInt(voteCandidatesIds.size())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java new file mode 100644 index 0000000..e68401c --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/Vote.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import java.util.List; + +/** + * Store a voter details. + */ +public class Vote { + private final String votingNode; + private final Boolean voteValue; // TODO: 7/16/17 weight? + private Boolean voteExchange; + private final List<String> liveMembers; + private final List<String> deadMembers; + + public Vote(String votingNode, Boolean voteValue, Boolean voteExchange, List<String> liveMembers, + List<String> deadMembers) { + this.votingNode = votingNode; + this.voteValue = voteValue; + this.voteExchange = voteExchange; + this.liveMembers = liveMembers; + this.deadMembers = deadMembers; + } + + public String getVotingNode() { + return votingNode; + } + + public Boolean getVoteValue() { + return voteValue; + } + + public Boolean getVoteExchange() { + return voteExchange; + } + + public void setVoteExchange(Boolean voteExchange) { + this.voteExchange = voteExchange; + } + + public List<String> getLiveMembers() { + return liveMembers; + } + + public List<String> getDeadMembers() { + return deadMembers; + } + + @Override + public String toString() { + return "votingNode=" + votingNode + ", voteValue=" + voteValue + ", liveMembers=" + liveMembers + + ", deadMembers= " + deadMembers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java new file mode 100644 index 0000000..b81b0b9 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteCandidate.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import java.util.Map; +import java.util.Objects; + +/** + * Stores the vote candidate details and its votes. + */ +public class VoteCandidate { + + private final String candidateNodeId; + private final String votingKey; + private final Map<String, Vote> votes; + + public VoteCandidate(String candidateNodeId, String votingKey, Map<String, Vote> votes) { + + this.candidateNodeId = candidateNodeId; + this.votingKey = votingKey; + this.votes = votes; + } + + public String getCandidateNodeId() { + return candidateNodeId; + } + + public String getVotingKey() { + return votingKey; + } + + public Map<String, Vote> getVotes() { + return votes; + } + + public void addVote(Vote vote) { + votes.put(vote.getVotingNode(), vote); + } + + @Override + public int hashCode() { + return Objects.hash(candidateNodeId, votingKey); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof VoteCandidate)) + return false; + if (obj == this) + return true; + VoteCandidate other = (VoteCandidate) obj; + return this.candidateNodeId.equals(other.candidateNodeId) && this.votingKey + .equals(other.votingKey); + } + + @Override + public String toString() { + return "candidateNodeId=" + candidateNodeId + ", votingKey=" + votingKey + ", votes= " + votes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java new file mode 100644 index 0000000..91c22b3 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/lock/vote/VoteSelector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import java.util.Set; + +/** + * This interface defines vote selection algorithm for the vote based locking. + */ +public interface VoteSelector { + /** + * This method get call by the lock manager of a node to decide which candidate need to be choose for voting. + * + * @param voteCandidateIds node id set for the vote candidates + * @return selected node id to vote from the given vote candidate set. + */ + String getVoteCandidateId(Set<String> voteCandidateIds); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index db442c6..2e45843 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -28,6 +28,8 @@ import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.event.data.UpdateNodeDataEventHandler; import org.apache.gossip.event.data.UpdateSharedDataEventHandler; +import org.apache.gossip.lock.LockManager; +import org.apache.gossip.lock.exceptions.VoteFailedException; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; @@ -43,7 +45,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -77,7 +83,8 @@ public abstract class GossipManager { private final GossipMemberStateRefresher memberStateRefresher; private final MessageHandler messageHandler; - + private final LockManager lockManager; + public GossipManager(String cluster, URI uri, String id, Map<String, String> properties, GossipSettings settings, List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, @@ -89,6 +96,7 @@ public abstract class GossipManager { me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); gossipCore = new GossipCore(this, registry); + this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry); dataReaper = new DataReaper(gossipCore, clock); members = new ConcurrentSkipListMap<>(); for (Member startupMember : gossipMembers) { @@ -221,6 +229,7 @@ public abstract class GossipManager { */ public void shutdown() { gossipServiceRunning.set(false); + lockManager.shutdown(); gossipCore.shutdown(); transportManager.shutdown(); dataReaper.close(); @@ -371,4 +380,21 @@ public abstract class GossipManager { public void registerGossipListener(GossipListener listener) { memberStateRefresher.register(listener); } + + /** + * Get the lock manager specified with this GossipManager. + * @return lock manager object. + */ + public LockManager getLockManager() { + return lockManager; + } + + /** + * Try to acquire a lock on given shared data key. + * @param key key of tha share data object. + * @throws VoteFailedException if the locking is failed. + */ + public void acquireSharedDataLock(String key) throws VoteFailedException{ + lockManager.acquireSharedDataLock(key); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java new file mode 100644 index 0000000..c558444 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/lock/vote/MajorityVoteTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock.vote; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +@RunWith(JUnitPlatform.class) +public class MajorityVoteTest { + + @Test + public void differentCandidateMergeTest() { + Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>(); + VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true)); + voteCandidateMap1.put("1", candidateA); + MajorityVote first = new MajorityVote(voteCandidateMap1); + + Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>(); + VoteCandidate candidateB = new VoteCandidate("3", "key1", generateVotes(3, 4, true, false)); + voteCandidateMap2.put("3", candidateB); + MajorityVote second = new MajorityVote(voteCandidateMap2); + + MajorityVote result = first.merge(second); + + Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue()); + Assert.assertTrue(!result.value().get("3").getVotes().get("4").getVoteValue()); + + } + + @Test + public void sameCandidateMergeTest() { + Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>(); + VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true)); + voteCandidateMap1.put("1", candidateA); + MajorityVote first = new MajorityVote(voteCandidateMap1); + + Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>(); + VoteCandidate candidateB = new VoteCandidate("1", "key1", generateVotes(3, 4, true, false)); + voteCandidateMap2.put("1", candidateB); + MajorityVote second = new MajorityVote(voteCandidateMap2); + + MajorityVote result = first.merge(second); + + Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue()); + Assert.assertTrue(!result.value().get("1").getVotes().get("4").getVoteValue()); + + } + + @Test + public void sameVoteMergeTest() { + Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>(); + VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true)); + voteCandidateMap1.put("1", candidateA); + MajorityVote first = new MajorityVote(voteCandidateMap1); + + Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>(); + VoteCandidate candidateB = new VoteCandidate("1", "key1", + generateVotes(2, 4, true, false, true)); + voteCandidateMap2.put("1", candidateB); + MajorityVote second = new MajorityVote(voteCandidateMap2); + + MajorityVote result = first.merge(second); + + Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue()); + } + + public Map<String, Vote> generateVotes(int startingNodeId, int endNodeId, boolean... votes) { + Map<String, Vote> voteMap = new HashMap<>(); + if ((endNodeId - startingNodeId + 1) > votes.length) { + return voteMap; + } + for (int i = startingNodeId; i <= endNodeId; i++) { + String nodeId = i + ""; + voteMap.put(nodeId, new Vote(nodeId, votes[i - startingNodeId], false, new ArrayList<>(), + new ArrayList<>())); + } + return voteMap; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/5ed3ed85/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java new file mode 100644 index 0000000..01dbac0 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataLockTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import org.apache.gossip.lock.exceptions.VoteFailedException; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(JUnitPlatform.class) +public class SharedDataLockTest extends AbstractIntegrationBase { + + @Test + public void sharedDataLockRandomVoteTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List<Member> startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List<GossipManager> clients = new ArrayList<>(); + final int clusterMembers = 10; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.getLockManager().setNumberOfNodes(clusterMembers); + gossipService.init(); + register(gossipService); + } + + // Adding new data to Node 1 + clients.get(0).gossipSharedData(sharedNodeData("category", "distributed")); + + final AtomicInteger lockSuccessCount = new AtomicInteger(0); + final AtomicInteger lockFailedCount = new AtomicInteger(0); + + // Node 1 try to lock on key category + Thread Node1LockingThread = new Thread(() -> { + try { + clients.get(0).acquireSharedDataLock("category"); + lockSuccessCount.incrementAndGet(); + } catch (VoteFailedException ignore) { + lockFailedCount.incrementAndGet(); + } + }); + + // Node 3 try to lock on key category + Thread Node3LockingThread = new Thread(() -> { + try { + clients.get(2).acquireSharedDataLock("category"); + lockSuccessCount.incrementAndGet(); + } catch (VoteFailedException ignore) { + lockFailedCount.incrementAndGet(); + } + }); + + // Node 6 try to lock on key category + Thread Node5LockingThread = new Thread(() -> { + try { + clients.get(5).acquireSharedDataLock("category"); + lockSuccessCount.incrementAndGet(); + } catch (VoteFailedException ignore) { + lockFailedCount.incrementAndGet(); + } + }); + + Node1LockingThread.start(); + Node3LockingThread.start(); + Node5LockingThread.start(); + + Node1LockingThread.join(); + Node3LockingThread.join(); + Node5LockingThread.join(); + + // Only one node should acquire the lock + Assert.assertEquals(1, lockSuccessCount.get()); + // Other nodes should fail + Assert.assertEquals(2, lockFailedCount.get()); + + } + + private SharedDataMessage sharedNodeData(String key, String value) { + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +}