I did not mean to bug you on vacation .have fun, don't worry about mentions , even in Mom vacation time do not feel obligated.
In response to is a better name. I will adjust. My thinking here is in a request response a thread is waiting on a reply. This would be similar. My thinking is ultimately tcp will be the default will be tcp so the performance udp is not a deal breaker. I do not know what the default req per second for a 10 node mesh will be. but I think it is a good enough start point. On Saturday, June 18, 2016, Gary Dusbabek <[email protected]> wrote: > Sorry for the top-post. My vacation has started and I'm in a rush... > > I like the idea of giving each message a uuid. It would definitely help > with ordering of messages from single nodes. > > Connectionless protocols could be handled with the addition of another > field "in-response-to" or something of that sort. It would allow for > determining parent-child relations and other message lineage. Fairly > useful, assuming there is a use. :) > > Regarding threading, I'm not sure what to recommend. I'm not a fan of > blocking, and I can see this model not performing well and holding onto a > lot of precious context while responses are waited for. Perhaps registering > the id in a callback hash, then checking the "in-response-to" field on each > message to see if it is a reply to the original message. Some state would > need to be kept too (e.g. to know how old the message is, so non-responses > could be reaped). > > I hope that helps. > > Gary. > > On Sat, Jun 18, 2016 at 10:53 AM, Edward Capriolo <[email protected] > <javascript:;>> > wrote: > > > I was playing with this concept more. This is still a rough plan, but my > > goal would be to provide a simple way for non connection oriented > protocols > > to exchange gossip message and receive a reply. Also it would have the > > affect of allowing testing of the protocol without transports. This same > > mechanism could be used for connection oriented protocols as well but > > possibly blocking in thread makes more sense there. > > > > @Gary what do you think here? > > > > package org.apache.gossip; > > > > import java.util.concurrent.Callable; > > import java.util.concurrent.ConcurrentSkipListMap; > > import java.util.concurrent.ExecutionException; > > import java.util.concurrent.ExecutorService; > > import java.util.concurrent.Future; > > import java.util.concurrent.TimeUnit; > > import java.util.concurrent.TimeoutException; > > > > class ActiveGossipUdpMessage extends Message { > > private int requestId;//could be a uuid > > > > public int getRequestId() { > > return requestId; > > } > > > > public void setRequestId(int requestId) { > > this.requestId = requestId; > > } > > } > > > > class Reply { // this should be in model > > > > } > > > > class Message { //this should be in model > > > > } > > > > class UdpReply extends Reply { > > private int requestId;//could be a uuid > > > > public int getRequestId() { > > return requestId; > > } > > > > public void setRequestId(int requestId) { > > this.requestId = requestId; > > } > > } > > > > class Transport { > > public void send(Object o){ > > > > } > > } > > > > public class GossipCoreUdp { > > > > private ConcurrentSkipListMap<Integer,Reply> inbound = new > > ConcurrentSkipListMap<>(); > > private ConcurrentSkipListMap<Integer, ActiveGossipUdpMessage> > outbound = > > new ConcurrentSkipListMap<>(); > > private final Transport transport; > > ExecutorService service; > > > > public GossipCoreUdp(Transport transport){ > > this.transport = transport; > > } > > > > public void recv(Object message){ > > if (message instanceof UdpReply){ > > UdpReply u = (UdpReply) message; > > inbound.put(u.getRequestId() , u); > > } > > //handle directly or other queues? > > //parent.handleMessage(message) > > } > > > > public Reply send(final ActiveGossipUdpMessage message){ > > outbound.put(message.getRequestId(), message); > > Callable<Reply> c = new Callable<Reply>(){ > > @Override > > public Reply call() throws Exception { > > /* > > transport.send(message); > > while (start-now > timeout ){ > > //poll inbound for result > > * if (found && idmatches){ > > * return found > > * } > > }*/ > > return null; > > }}; > > Future<Reply> future = service.submit(c); > > Reply result = null; > > try { > > result = future.get(10, TimeUnit.SECONDS); > > return result; > > } catch (InterruptedException | ExecutionException | TimeoutException > > e) { > > boolean a = future.cancel(true); > > throw new RuntimeException(e);// throw timeout?? > > } finally { > > outbound.remove(message.getRequestId()); > > } > > > > } > > } > > > -- Sorry this was sent from mobile. Will do less grammar and spell check than usual.
