I was playing with this concept more. This is still a rough plan, but my
goal would be to provide a simple way for non connection oriented protocols
to exchange gossip message and receive a reply. Also it would have the
affect of allowing testing of the protocol without transports. This same
mechanism could be used for connection oriented protocols as well but
possibly blocking in thread makes more sense there.
@Gary what do you think here?
package org.apache.gossip;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
class ActiveGossipUdpMessage extends Message {
private int requestId;//could be a uuid
public int getRequestId() {
return requestId;
}
public void setRequestId(int requestId) {
this.requestId = requestId;
}
}
class Reply { // this should be in model
}
class Message { //this should be in model
}
class UdpReply extends Reply {
private int requestId;//could be a uuid
public int getRequestId() {
return requestId;
}
public void setRequestId(int requestId) {
this.requestId = requestId;
}
}
class Transport {
public void send(Object o){
}
}
public class GossipCoreUdp {
private ConcurrentSkipListMap<Integer,Reply> inbound = new
ConcurrentSkipListMap<>();
private ConcurrentSkipListMap<Integer, ActiveGossipUdpMessage> outbound =
new ConcurrentSkipListMap<>();
private final Transport transport;
ExecutorService service;
public GossipCoreUdp(Transport transport){
this.transport = transport;
}
public void recv(Object message){
if (message instanceof UdpReply){
UdpReply u = (UdpReply) message;
inbound.put(u.getRequestId() , u);
}
//handle directly or other queues?
//parent.handleMessage(message)
}
public Reply send(final ActiveGossipUdpMessage message){
outbound.put(message.getRequestId(), message);
Callable<Reply> c = new Callable<Reply>(){
@Override
public Reply call() throws Exception {
/*
transport.send(message);
while (start-now > timeout ){
//poll inbound for result
* if (found && idmatches){
* return found
* }
}*/
return null;
}};
Future<Reply> future = service.submit(c);
Reply result = null;
try {
result = future.get(10, TimeUnit.SECONDS);
return result;
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
boolean a = future.cancel(true);
throw new RuntimeException(e);// throw timeout??
} finally {
outbound.remove(message.getRequestId());
}
}
}