Repository: incubator-gossip
Updated Branches:
  refs/heads/master 4aafc3ba3 -> 6a4d50cae


GOSSIP-77 better send()


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/6a4d50ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/6a4d50ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/6a4d50ca

Branch: refs/heads/master
Commit: 6a4d50cae7269584fcab3c3b47b75a1982b26864
Parents: 4aafc3b
Author: Edward Capriolo <[email protected]>
Authored: Mon Mar 20 22:04:07 2017 -0400
Committer: Edward Capriolo <[email protected]>
Committed: Mon Mar 20 22:04:07 2017 -0400

----------------------------------------------------------------------
 .../org/apache/gossip/manager/GossipCore.java   | 59 +++++++++-----------
 .../manager/handlers/ResponseHandler.java       |  2 +-
 2 files changed, 28 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a4d50ca/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java 
b/src/main/java/org/apache/gossip/manager/GossipCore.java
index e3dcb21..f53419d 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -45,9 +45,18 @@ import java.util.concurrent.*;
 
 public class GossipCore implements GossipCoreConstants {
 
+  class LatchAndBase {
+    private final CountDownLatch latch;
+    private volatile Base base;
+    
+    LatchAndBase(){
+      latch = new CountDownLatch(1);
+    }
+    
+  }
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private final GossipManager gossipManager;
-  private ConcurrentHashMap<String, Base> requests;
+  private ConcurrentHashMap<String, LatchAndBase> requests;
   private ThreadPoolExecutor service;
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
PerNodeDataMessage>> perNodeData;
   private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
@@ -224,46 +233,30 @@ public class GossipCore implements GossipCoreConstants {
     }
 
     final Trackable t;
+    LatchAndBase latchAndBase = null;
     if (message instanceof Trackable){
       t = (Trackable) message;
+      latchAndBase = new LatchAndBase();
+      requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
     } else {
       t = null;
     }
     sendInternal(message, uri);
-    if (t == null){
+    if (latchAndBase == null){
       return null;
-    }
-    final Future<Response> response = service.submit( new Callable<Response>(){
-      @Override
-      public Response call() throws Exception {
-        while(true){
-          Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom());
-          if (b != null){
-            return (Response) b;
-          }
-          try {
-            Thread.sleep(0, 555555);
-          } catch (InterruptedException e) {
-
-          }
-        }
-      }
-    });
-
+    } 
+    
     try {
-      //TODO this needs to be a setting base on attempts/second
-      return response.get(1, TimeUnit.SECONDS);
+      boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
+      if (complete){
+        return (Response) latchAndBase.base;
+      } else{
+        return null;
+      }
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      LOGGER.debug(e.getMessage(), e);
-      return null;
-    } catch (TimeoutException e) {
-      boolean cancelled = response.cancel(true);
-      LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, 
cancelled ? %b", uri.toString(), cancelled));
-      return null;
     } finally {
-      if (t != null){
+      if (latchAndBase != null){
         requests.remove(t.getUuid() + "/" + t.getUriFrom());
       }
     }
@@ -302,8 +295,10 @@ public class GossipCore implements GossipCoreConstants {
     }
   }
 
-  public void addRequest(String k, Base v) {
-    requests.put(k, v);
+  public void handleResponse(String k, Base v) {
+    LatchAndBase latch = requests.get(k);
+    latch.base = v;
+    latch.latch.countDown();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/6a4d50ca/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java 
b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
index 36102d5..2f33b01 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
@@ -27,7 +27,7 @@ public class ResponseHandler implements MessageHandler {
   public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base 
base) {
     if (base instanceof Trackable) {
       Trackable t = (Trackable) base;
-      gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+      gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
     }
   }
 }

Reply via email to