Sorry for the top-post. My vacation has started and I'm in a rush... I like the idea of giving each message a uuid. It would definitely help with ordering of messages from single nodes.
Connectionless protocols could be handled with the addition of another field "in-response-to" or something of that sort. It would allow for determining parent-child relations and other message lineage. Fairly useful, assuming there is a use. :) Regarding threading, I'm not sure what to recommend. I'm not a fan of blocking, and I can see this model not performing well and holding onto a lot of precious context while responses are waited for. Perhaps registering the id in a callback hash, then checking the "in-response-to" field on each message to see if it is a reply to the original message. Some state would need to be kept too (e.g. to know how old the message is, so non-responses could be reaped). I hope that helps. Gary. On Sat, Jun 18, 2016 at 10:53 AM, Edward Capriolo <[email protected]> wrote: > I was playing with this concept more. This is still a rough plan, but my > goal would be to provide a simple way for non connection oriented protocols > to exchange gossip message and receive a reply. Also it would have the > affect of allowing testing of the protocol without transports. This same > mechanism could be used for connection oriented protocols as well but > possibly blocking in thread makes more sense there. > > @Gary what do you think here? > > package org.apache.gossip; > > import java.util.concurrent.Callable; > import java.util.concurrent.ConcurrentSkipListMap; > import java.util.concurrent.ExecutionException; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Future; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > > class ActiveGossipUdpMessage extends Message { > private int requestId;//could be a uuid > > public int getRequestId() { > return requestId; > } > > public void setRequestId(int requestId) { > this.requestId = requestId; > } > } > > class Reply { // this should be in model > > } > > class Message { //this should be in model > > } > > class UdpReply extends Reply { > private int requestId;//could be a uuid > > public int getRequestId() { > return requestId; > } > > public void setRequestId(int requestId) { > this.requestId = requestId; > } > } > > class Transport { > public void send(Object o){ > > } > } > > public class GossipCoreUdp { > > private ConcurrentSkipListMap<Integer,Reply> inbound = new > ConcurrentSkipListMap<>(); > private ConcurrentSkipListMap<Integer, ActiveGossipUdpMessage> outbound = > new ConcurrentSkipListMap<>(); > private final Transport transport; > ExecutorService service; > > public GossipCoreUdp(Transport transport){ > this.transport = transport; > } > > public void recv(Object message){ > if (message instanceof UdpReply){ > UdpReply u = (UdpReply) message; > inbound.put(u.getRequestId() , u); > } > //handle directly or other queues? > //parent.handleMessage(message) > } > > public Reply send(final ActiveGossipUdpMessage message){ > outbound.put(message.getRequestId(), message); > Callable<Reply> c = new Callable<Reply>(){ > @Override > public Reply call() throws Exception { > /* > transport.send(message); > while (start-now > timeout ){ > //poll inbound for result > * if (found && idmatches){ > * return found > * } > }*/ > return null; > }}; > Future<Reply> future = service.submit(c); > Reply result = null; > try { > result = future.get(10, TimeUnit.SECONDS); > return result; > } catch (InterruptedException | ExecutionException | TimeoutException > e) { > boolean a = future.cancel(true); > throw new RuntimeException(e);// throw timeout?? > } finally { > outbound.remove(message.getRequestId()); > } > > } > } >
