[ https://issues.apache.org/jira/browse/GOSSIP-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110139#comment-16110139 ]
ASF GitHub Bot commented on GOSSIP-75: -------------------------------------- Github user edwardcapriolo commented on a diff in the pull request: https://github.com/apache/incubator-gossip/pull/62#discussion_r130771383 --- Diff: gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.lock; + +import com.sun.org.apache.xpath.internal.SourceTree; +import org.apache.gossip.Member; +import org.apache.gossip.crdt.Crdt; +import org.apache.gossip.event.data.UpdateSharedDataEventHandler; +import org.apache.gossip.lock.exceptions.VoteFailedException; +import org.apache.gossip.lock.vote.MajorityVote; +import org.apache.gossip.lock.vote.Vote; +import org.apache.gossip.lock.vote.VoteCandidate; +import org.apache.gossip.lock.vote.VoteSelector; +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.SharedDataMessage; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class LockManager { + + private final GossipManager gossipManager; + private final GossipCore gossipCore; + private final Map<String, MajorityVote> majorityVoteCache; + private final ScheduledExecutorService cacheUpdateService; + private final ScheduledExecutorService voteService; + private final VoteSelector voteSelector; + private int numberOfNodes = -1; + private int deadlockDetectionThreshold = 2; + + public LockManager(GossipManager gossipManager, GossipCore gossipCore, VoteSelector voteSelector) { + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; + this.voteSelector = voteSelector; + this.majorityVoteCache = new ConcurrentHashMap<>(); + + cacheUpdateService = Executors.newScheduledThreadPool(2); + voteService = Executors.newScheduledThreadPool(2); + cacheUpdateService.scheduleAtFixedRate(this::updateCache, 0, + 1000, TimeUnit.MILLISECONDS); + voteService.scheduleAtFixedRate(this::updateVotes, 0, + 1000, TimeUnit.MILLISECONDS); + } + + public void acquireSharedDataLock(String key) throws VoteFailedException { + + gossipManager.merge(generateLockMessage(key)); + + // TODO: 7/16/17 gather stats, check possibilities of optimizations + int deadlockDetectCount = 0; + while (true){ + MajorityVote majorityVoteResult = majorityVoteCache.get(generateLockKey(key)); + if(majorityVoteResult == null ){ + continue; + } + final Map<String, VoteCandidate> voteCandidatesMap = majorityVoteResult.value(); + final Map<String, Boolean> voteResultMap = new HashMap<>(); + + voteCandidatesMap.forEach((candidateId, voteCandidate) + -> voteResultMap.put(candidateId, isVoteSuccess(voteCandidate))); + + long passedCandidates = voteResultMap.values().stream().filter(aBoolean -> aBoolean).count(); + + String myNodeId = gossipManager.getMyself().getId(); + System.out.println("Node " + myNodeId + " vote map " + voteResultMap + " pass " + passedCandidates); + + if(passedCandidates == 0 && isDeadLock(voteCandidatesMap)){ + deadlockDetectCount++; + if(deadlockDetectCount >= deadlockDetectionThreshold) { + if(preventDeadLock(voteCandidatesMap)) { + majorityVoteCache.put(generateLockKey(key), majorityVoteResult); + } + } + }else if(passedCandidates == 1 && voteResultMap.containsKey(myNodeId)){ + if(voteResultMap.get(myNodeId)){ + break; + }else { + throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key); + } + } else { + // TODO: 7/16/17 check for possibility and handle multiple passed candidates + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + private SharedDataMessage generateLockMessage(String key){ + VoteCandidate voteCandidate = new VoteCandidate(gossipManager.getMyself().getId(), key, new ConcurrentHashMap<>()); + voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true,false, + gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()), + gossipManager.getDeadMembers().stream().map(Member::getId) + .collect(Collectors.toList()))); + Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>(); + voteCandidateMap.put(voteCandidate.getCandidateNodeId(),voteCandidate); + MajorityVote majorityVote = new MajorityVote(voteCandidateMap); + SharedDataMessage lockMessage = new SharedDataMessage(); + lockMessage.setKey(generateLockKey(key)); + lockMessage.setPayload(majorityVote); + lockMessage.setExpireAt(Long.MAX_VALUE); + lockMessage.setTimestamp(System.currentTimeMillis()); + return lockMessage; + } + + private void updateVotes(){ + for (Map.Entry<String, MajorityVote> entry : majorityVoteCache.entrySet()) { + String key = entry.getKey(); + MajorityVote majorityVote = entry.getValue(); + Map<String, VoteCandidate> voteCandidateMap = majorityVote.value(); + String myNodeId = gossipManager.getMyself().getId(); + + if (isVotedToAll(myNodeId, voteCandidateMap)) { + continue; + } + String myVoteCandidate = getVotedCandidateNodeId(myNodeId,voteCandidateMap); + + if (myVoteCandidate == null) { + myVoteCandidate = voteSelector.getVoteCandidateId(voteCandidateMap.keySet()); +// if(Integer.parseInt(myNodeId)%2 == 1){ +// myVoteCandidate = "3"; +// }else { +// myVoteCandidate = "6"; +// } + } + for (VoteCandidate voteCandidate : voteCandidateMap.values()) { + if (voteCandidate.getCandidateNodeId().equals(myNodeId)) { + continue; + } + boolean voteResult = voteCandidate.getCandidateNodeId().equals(myVoteCandidate); + voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), voteResult,false, + gossipManager.getLiveMembers().stream().map(Member::getId) + .collect(Collectors.toList()), + gossipManager.getDeadMembers().stream().map(Member::getId) + .collect(Collectors.toList()))); + } + } + } + + private boolean isVotedToAll(String nodeId, final Map<String,VoteCandidate> voteCandidates){ + int voteCount = 0; + for (VoteCandidate voteCandidate:voteCandidates.values()) { + if(voteCandidate.getVotes().containsKey(nodeId)){ + voteCount++; + } + } +// System.out.println("Node " + nodeId + ", VoteCount " + voteCount + ", Participants " + voteContextSet.size() + " " + voteContextSet); + return voteCount == voteCandidates.size(); + } + + private void updateCache() { + for (Map.Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()) { + String key = entry.getKey(); + SharedDataMessage sharedDataMessage = entry.getValue(); + if (sharedDataMessage.getPayload() instanceof MajorityVote) { + MajorityVote newMajorityVote = (MajorityVote) sharedDataMessage.getPayload(); + MajorityVote existingMajorityVote = majorityVoteCache.putIfAbsent(key, newMajorityVote); + if (existingMajorityVote == null) { + return; + } + MajorityVote mergedMajorityVote = existingMajorityVote.merge(newMajorityVote); + majorityVoteCache.replace(key, existingMajorityVote, mergedMajorityVote); + sharedDataMessage.setPayload(mergedMajorityVote); + gossipCore.getSharedData().put(key, sharedDataMessage); + } + } + } + + + private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates){ + boolean result = true; + Set<String> liveNodes = voteCandidates.values().stream() + .map(voteCandidate -> voteCandidate.getVotes().values()) + .flatMap(Collection::stream).map(Vote::getLiveMembers) + .flatMap(List::stream).collect(Collectors.toSet()); + for (VoteCandidate voteCandidate: voteCandidates.values()) { +// System.out.println("==== Node " + voteCandidate.getCandidateNodeId() + " live " + liveNodes + " votes " + voteCandidate.getVotes().size() ); --- End diff -- Remove comments > Voting interface > ---------------- > > Key: GOSSIP-75 > URL: https://issues.apache.org/jira/browse/GOSSIP-75 > Project: Gossip > Issue Type: New Feature > Reporter: Edward Capriolo > > Gossip has CRDT support. This is an important building block to doing higher > level things. The next piece is being able to act on an object when we > receive it. For example lets take the most simple case. I want the cluster to > vote on something such as "who asked for this lock first". Currently we > replicate objects lazily through a thread, what we want to do is on reception > of an object apply some function such that we can modify the object being > received. > The way I want to go about this is voting objects can be injected with a type > like VoteContext > http://stackoverflow.com/questions/27133161/how-to-pass-constructors-parameters-with-jackson > > Users can register Voter implementations. On receiving an object the > interface allows logic to be run. In the case of a Voting each node appends > its vote as the object moves around over time you can poll your local copy > and determine the result of the vote. -- This message was sent by Atlassian JIRA (v6.4.14#64029)