[
https://issues.apache.org/jira/browse/GOSSIP-75?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110139#comment-16110139
]
ASF GitHub Bot commented on GOSSIP-75:
--------------------------------------
Github user edwardcapriolo commented on a diff in the pull request:
https://github.com/apache/incubator-gossip/pull/62#discussion_r130771383
--- Diff: gossip-base/src/main/java/org/apache/gossip/lock/LockManager.java
---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.lock;
+
+import com.sun.org.apache.xpath.internal.SourceTree;
+import org.apache.gossip.Member;
+import org.apache.gossip.crdt.Crdt;
+import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
+import org.apache.gossip.lock.exceptions.VoteFailedException;
+import org.apache.gossip.lock.vote.MajorityVote;
+import org.apache.gossip.lock.vote.Vote;
+import org.apache.gossip.lock.vote.VoteCandidate;
+import org.apache.gossip.lock.vote.VoteSelector;
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.SharedDataMessage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class LockManager {
+
+ private final GossipManager gossipManager;
+ private final GossipCore gossipCore;
+ private final Map<String, MajorityVote> majorityVoteCache;
+ private final ScheduledExecutorService cacheUpdateService;
+ private final ScheduledExecutorService voteService;
+ private final VoteSelector voteSelector;
+ private int numberOfNodes = -1;
+ private int deadlockDetectionThreshold = 2;
+
+ public LockManager(GossipManager gossipManager, GossipCore gossipCore,
VoteSelector voteSelector) {
+ this.gossipManager = gossipManager;
+ this.gossipCore = gossipCore;
+ this.voteSelector = voteSelector;
+ this.majorityVoteCache = new ConcurrentHashMap<>();
+
+ cacheUpdateService = Executors.newScheduledThreadPool(2);
+ voteService = Executors.newScheduledThreadPool(2);
+ cacheUpdateService.scheduleAtFixedRate(this::updateCache, 0,
+ 1000, TimeUnit.MILLISECONDS);
+ voteService.scheduleAtFixedRate(this::updateVotes, 0,
+ 1000, TimeUnit.MILLISECONDS);
+ }
+
+ public void acquireSharedDataLock(String key) throws VoteFailedException
{
+
+ gossipManager.merge(generateLockMessage(key));
+
+ // TODO: 7/16/17 gather stats, check possibilities of optimizations
+ int deadlockDetectCount = 0;
+ while (true){
+ MajorityVote majorityVoteResult =
majorityVoteCache.get(generateLockKey(key));
+ if(majorityVoteResult == null ){
+ continue;
+ }
+ final Map<String, VoteCandidate> voteCandidatesMap =
majorityVoteResult.value();
+ final Map<String, Boolean> voteResultMap = new HashMap<>();
+
+ voteCandidatesMap.forEach((candidateId, voteCandidate)
+ -> voteResultMap.put(candidateId,
isVoteSuccess(voteCandidate)));
+
+ long passedCandidates =
voteResultMap.values().stream().filter(aBoolean -> aBoolean).count();
+
+ String myNodeId = gossipManager.getMyself().getId();
+ System.out.println("Node " + myNodeId + " vote map " +
voteResultMap + " pass " + passedCandidates);
+
+ if(passedCandidates == 0 && isDeadLock(voteCandidatesMap)){
+ deadlockDetectCount++;
+ if(deadlockDetectCount >= deadlockDetectionThreshold) {
+ if(preventDeadLock(voteCandidatesMap)) {
+ majorityVoteCache.put(generateLockKey(key),
majorityVoteResult);
+ }
+ }
+ }else if(passedCandidates == 1 &&
voteResultMap.containsKey(myNodeId)){
+ if(voteResultMap.get(myNodeId)){
+ break;
+ }else {
+ throw new VoteFailedException("Node " + myNodeId + " failed to
lock on key: " + key);
+ }
+ } else {
+ // TODO: 7/16/17 check for possibility and handle multiple passed
candidates
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private SharedDataMessage generateLockMessage(String key){
+ VoteCandidate voteCandidate = new
VoteCandidate(gossipManager.getMyself().getId(), key, new
ConcurrentHashMap<>());
+ voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(),
true,false,
+
gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()),
+ gossipManager.getDeadMembers().stream().map(Member::getId)
+ .collect(Collectors.toList())));
+ Map<String, VoteCandidate> voteCandidateMap = new
ConcurrentHashMap<>();
+ voteCandidateMap.put(voteCandidate.getCandidateNodeId(),voteCandidate);
+ MajorityVote majorityVote = new MajorityVote(voteCandidateMap);
+ SharedDataMessage lockMessage = new SharedDataMessage();
+ lockMessage.setKey(generateLockKey(key));
+ lockMessage.setPayload(majorityVote);
+ lockMessage.setExpireAt(Long.MAX_VALUE);
+ lockMessage.setTimestamp(System.currentTimeMillis());
+ return lockMessage;
+ }
+
+ private void updateVotes(){
+ for (Map.Entry<String, MajorityVote> entry :
majorityVoteCache.entrySet()) {
+ String key = entry.getKey();
+ MajorityVote majorityVote = entry.getValue();
+ Map<String, VoteCandidate> voteCandidateMap = majorityVote.value();
+ String myNodeId = gossipManager.getMyself().getId();
+
+ if (isVotedToAll(myNodeId, voteCandidateMap)) {
+ continue;
+ }
+ String myVoteCandidate =
getVotedCandidateNodeId(myNodeId,voteCandidateMap);
+
+ if (myVoteCandidate == null) {
+ myVoteCandidate =
voteSelector.getVoteCandidateId(voteCandidateMap.keySet());
+// if(Integer.parseInt(myNodeId)%2 == 1){
+// myVoteCandidate = "3";
+// }else {
+// myVoteCandidate = "6";
+// }
+ }
+ for (VoteCandidate voteCandidate : voteCandidateMap.values()) {
+ if (voteCandidate.getCandidateNodeId().equals(myNodeId)) {
+ continue;
+ }
+ boolean voteResult =
voteCandidate.getCandidateNodeId().equals(myVoteCandidate);
+ voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(),
voteResult,false,
+ gossipManager.getLiveMembers().stream().map(Member::getId)
+ .collect(Collectors.toList()),
+ gossipManager.getDeadMembers().stream().map(Member::getId)
+ .collect(Collectors.toList())));
+ }
+ }
+ }
+
+ private boolean isVotedToAll(String nodeId, final
Map<String,VoteCandidate> voteCandidates){
+ int voteCount = 0;
+ for (VoteCandidate voteCandidate:voteCandidates.values()) {
+ if(voteCandidate.getVotes().containsKey(nodeId)){
+ voteCount++;
+ }
+ }
+// System.out.println("Node " + nodeId + ", VoteCount " + voteCount +
", Participants " + voteContextSet.size() + " " + voteContextSet);
+ return voteCount == voteCandidates.size();
+ }
+
+ private void updateCache() {
+ for (Map.Entry<String, SharedDataMessage> entry :
gossipCore.getSharedData().entrySet()) {
+ String key = entry.getKey();
+ SharedDataMessage sharedDataMessage = entry.getValue();
+ if (sharedDataMessage.getPayload() instanceof MajorityVote) {
+ MajorityVote newMajorityVote = (MajorityVote)
sharedDataMessage.getPayload();
+ MajorityVote existingMajorityVote =
majorityVoteCache.putIfAbsent(key, newMajorityVote);
+ if (existingMajorityVote == null) {
+ return;
+ }
+ MajorityVote mergedMajorityVote =
existingMajorityVote.merge(newMajorityVote);
+ majorityVoteCache.replace(key, existingMajorityVote,
mergedMajorityVote);
+ sharedDataMessage.setPayload(mergedMajorityVote);
+ gossipCore.getSharedData().put(key, sharedDataMessage);
+ }
+ }
+ }
+
+
+ private boolean isDeadLock(final Map<String, VoteCandidate>
voteCandidates){
+ boolean result = true;
+ Set<String> liveNodes = voteCandidates.values().stream()
+ .map(voteCandidate -> voteCandidate.getVotes().values())
+ .flatMap(Collection::stream).map(Vote::getLiveMembers)
+ .flatMap(List::stream).collect(Collectors.toSet());
+ for (VoteCandidate voteCandidate: voteCandidates.values()) {
+// System.out.println("==== Node " +
voteCandidate.getCandidateNodeId() + " live " + liveNodes + " votes " +
voteCandidate.getVotes().size() );
--- End diff --
Remove comments
> Voting interface
> ----------------
>
> Key: GOSSIP-75
> URL: https://issues.apache.org/jira/browse/GOSSIP-75
> Project: Gossip
> Issue Type: New Feature
> Reporter: Edward Capriolo
>
> Gossip has CRDT support. This is an important building block to doing higher
> level things. The next piece is being able to act on an object when we
> receive it. For example lets take the most simple case. I want the cluster to
> vote on something such as "who asked for this lock first". Currently we
> replicate objects lazily through a thread, what we want to do is on reception
> of an object apply some function such that we can modify the object being
> received.
> The way I want to go about this is voting objects can be injected with a type
> like VoteContext
> http://stackoverflow.com/questions/27133161/how-to-pass-constructors-parameters-with-jackson
>
> Users can register Voter implementations. On receiving an object the
> interface allows logic to be run. In the case of a Voting each node appends
> its vote as the object moves around over time you can poll your local copy
> and determine the result of the vote.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)