Sorry for the top-post. My vacation has started and I'm in a rush...

I like the idea of giving each message a uuid. It would definitely help
with ordering of messages from single nodes.

Connectionless protocols could be handled with the addition of another
field "in-response-to" or something of that sort. It would allow for
determining parent-child relations and other message lineage. Fairly
useful, assuming there is a use. :)

Regarding threading, I'm not sure what to recommend. I'm not a fan of
blocking, and I can see this model not performing well and holding onto a
lot of precious context while responses are waited for. Perhaps registering
the id in a callback hash, then checking the "in-response-to" field on each
message to see if it is a reply to the original message. Some state would
need to be kept too (e.g. to know how old the message is, so non-responses
could be reaped).

I hope that helps.

Gary.

On Sat, Jun 18, 2016 at 10:53 AM, Edward Capriolo <[email protected]>
wrote:

> I was playing with this concept more. This is still a rough plan, but my
> goal would be to provide a simple way for non connection oriented protocols
> to exchange gossip message and receive a reply. Also it would have the
> affect of allowing testing of the protocol without transports. This same
> mechanism could be used for connection oriented protocols as well but
> possibly blocking in thread makes more sense there.
>
> @Gary what do you think here?
>
> package org.apache.gossip;
>
> import java.util.concurrent.Callable;
> import java.util.concurrent.ConcurrentSkipListMap;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Future;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
>
> class ActiveGossipUdpMessage extends Message {
>   private int requestId;//could be a uuid
>
>   public int getRequestId() {
>     return requestId;
>   }
>
>   public void setRequestId(int requestId) {
>     this.requestId = requestId;
>   }
> }
>
> class Reply { // this should be in model
>
> }
>
> class Message { //this should be in model
>
> }
>
> class UdpReply extends Reply {
>   private int requestId;//could be a uuid
>
>   public int getRequestId() {
>     return requestId;
>   }
>
>   public void setRequestId(int requestId) {
>     this.requestId = requestId;
>   }
> }
>
> class Transport {
>   public void send(Object o){
>
>   }
> }
>
> public class GossipCoreUdp {
>
>   private ConcurrentSkipListMap<Integer,Reply> inbound = new
> ConcurrentSkipListMap<>();
>   private ConcurrentSkipListMap<Integer, ActiveGossipUdpMessage> outbound =
> new ConcurrentSkipListMap<>();
>   private final Transport transport;
>   ExecutorService service;
>
>   public GossipCoreUdp(Transport transport){
>     this.transport = transport;
>   }
>
>   public void recv(Object message){
>     if (message instanceof UdpReply){
>       UdpReply u = (UdpReply) message;
>       inbound.put(u.getRequestId() , u);
>     }
>     //handle directly or other queues?
>     //parent.handleMessage(message)
>   }
>
>   public Reply send(final ActiveGossipUdpMessage message){
>     outbound.put(message.getRequestId(), message);
>     Callable<Reply> c = new Callable<Reply>(){
>       @Override
>       public Reply call() throws Exception {
>         /*
>         transport.send(message);
>         while (start-now > timeout ){
>           //poll inbound for result
>            * if (found && idmatches){
>            *   return found
>            * }
>         }*/
>         return null;
>       }};
>     Future<Reply> future = service.submit(c);
>     Reply result = null;
>     try {
>       result = future.get(10, TimeUnit.SECONDS);
>       return result;
>     } catch (InterruptedException | ExecutionException | TimeoutException
> e) {
>       boolean a = future.cancel(true);
>       throw new RuntimeException(e);// throw timeout??
>     } finally {
>       outbound.remove(message.getRequestId());
>     }
>
>   }
> }
>

Reply via email to