Hi, all
I have a server using MINA. In the server side, I definded a BlockingQueue
to keep the IoSession when some client connected. I would take the IoSession
from the queue do some business. But, when I call unbindAll() method to
shutdown the server, I have found that the IoSession kept in the
BlockingQueue was closed. That is not what I expect. I need the IoSession to
write back to the Client. How could I do this? Anyone could help me?
Below is my code, and there are 3 "is right?". Any better suggestions? Or
maybe I was wrong. thx a lot.
/** server class **/
public class CommunicatorServer {
private static final int PORT = 8080;
public static SocketAcceptor acceptor;
public static ExecutorService executorService1;
public static ExecutorService executorService2;
private static BlockingQueue<IoSession> queue = new
LinkedBlockingQueue<IoSession>();
public static void main(String[] args) throws Exception {
executorService1 = Executors.newCachedThreadPool();
acceptor = new SocketAcceptor(2, executorService1);
acceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setDisconnectOnUnbind(true);
cfg.getSessionConfig().setReuseAddress(true);
cfg.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset
.forName("UTF-8"))));
executorService2 = Executors.newCachedThreadPool();
cfg.getFilterChain().addLast("pool",
new ExecutorFilter(executorService2));
// Bind
acceptor.bind(new InetSocketAddress(PORT),
new CommunicatorServerHandler(queue), cfg);
// new a message consumer, is this right?
new MessageConsumer(queue, executorService2);
}
public void stop() {
// unbind server, is this right?
acceptor.unbindAll();
executorService1.shutdown();
executorService2.shutdown();
// wait the queue is empty
while (true) {
if (!queue.isEmpty()) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
break;
}
}
}
}
/** server handler class **/
public class CommunicatorServerHandler extends IoHandlerAdapter {
private BlockingQueue<IoSession> queue = new
LinkedBlockingQueue<IoSession>();
public CommunicatorServerHandler(BlockingQueue<IoSession> queue) {
this.queue = queue;
}
public void exceptionCaught(IoSession session, Throwable cause) {
session.close();
}
public void messageReceived(IoSession session, Object message)
throws Exception {
if (StringUtils.equals("close", message.toString())) {
// stop the server, is this right?
CommunicatorServer server = new CommunicatorServer();
server.stop();
}
else {
session.setAttachment(message);
queue.put(session);
}
}
}
/** message consumer class **/
public class MessageConsumer implements Callable<String> {
private BlockingQueue queue;
private ExecutorService executorService;
public MessageConsumer(BlockingQueue<IoSession> queue) {
this.queue = queue;
Executors.newCachedThreadPool().submit(this);
}
public MessageConsumer(BlockingQueue<IoSession> queue,
ExecutorService executorService) {
this.queue = queue;
this.executorService = executorService;
this.executorService.submit(this);
}
private IoSession returnSession(BlockingQueue queue)
throws InterruptedException {
return (IoSession) queue.take();
}
public String call() throws Exception {
while (true) {
IoSession session = returnSession(queue);
// do some business
}
}
}
--
View this message in context:
http://www.nabble.com/unbindAll%28%29-would-close-the-IoSession--tf3990073.html#a11329861
Sent from the mina dev mailing list archive at Nabble.com.