Hi Messi,
If you don't want that your IoSession are closed when you call unbindAll()
you should replace
cfg.setDisconnectOnUnbind(true);
with
cfg.setDisconnectOnUnbind(false);
some remarks about:
public void messageReceived(IoSession session, Object message)
throws Exception {
if (StringUtils.equals("close", message.toString())) {
// stop the server, is this right?
CommunicatorServer server = new CommunicatorServer();
server.stop();
}
else {
session.setAttachment(message);
queue.put(session);
}
}
(1) creating a new CommunicatorServer instance, to stop the server is a bit
weird
a) since your SocketAcceptor is already static, you could make the stop
method static as well
b) I would avoid static fields altogether
(2) you are storing the incoming message in the IoSession and the session in
the queue
but when the client sends a second message before your MessageConsumer
processes the first message
the attachment will be overwritten
(3) any client could send "stop" and shutdown the server
(4) Are you sure you need the LinkedBlockingQueue ?
You could also opt to do your business logic inside messageReceived()
without an extra thread pool
Maarten
On 6/27/07, Messi Chan <[EMAIL PROTECTED]> wrote:
Hi, all
I have a server using MINA. In the server side, I definded a BlockingQueue
to keep the IoSession when some client connected. I would take the
IoSession
from the queue do some business. But, when I call unbindAll() method to
shutdown the server, I have found that the IoSession kept in the
BlockingQueue was closed. That is not what I expect. I need the IoSession
to
write back to the Client. How could I do this? Anyone could help me?
Below is my code, and there are 3 "is right?". Any better suggestions? Or
maybe I was wrong. thx a lot.
/** server class **/
public class CommunicatorServer {
private static final int PORT = 8080;
public static SocketAcceptor acceptor;
public static ExecutorService executorService1;
public static ExecutorService executorService2;
private static BlockingQueue<IoSession> queue = new
LinkedBlockingQueue<IoSession>();
public static void main(String[] args) throws Exception {
executorService1 = Executors.newCachedThreadPool();
acceptor = new SocketAcceptor(2, executorService1);
acceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setDisconnectOnUnbind(true);
cfg.getSessionConfig().setReuseAddress(true);
cfg.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset
.forName("UTF-8"))));
executorService2 = Executors.newCachedThreadPool();
cfg.getFilterChain().addLast("pool",
new ExecutorFilter(executorService2));
// Bind
acceptor.bind(new InetSocketAddress(PORT),
new CommunicatorServerHandler(queue), cfg);
// new a message consumer, is this right?
new MessageConsumer(queue, executorService2);
}
public void stop() {
// unbind server, is this right?
acceptor.unbindAll();
executorService1.shutdown();
executorService2.shutdown();
// wait the queue is empty
while (true) {
if (!queue.isEmpty()) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
break;
}
}
}
}
/** server handler class **/
public class CommunicatorServerHandler extends IoHandlerAdapter {
private BlockingQueue<IoSession> queue = new
LinkedBlockingQueue<IoSession>();
public CommunicatorServerHandler(BlockingQueue<IoSession> queue) {
this.queue = queue;
}
public void exceptionCaught(IoSession session, Throwable cause) {
session.close();
}
public void messageReceived(IoSession session, Object message)
throws Exception {
if (StringUtils.equals("close", message.toString())) {
// stop the server, is this right?
CommunicatorServer server = new CommunicatorServer();
server.stop();
}
else {
session.setAttachment(message);
queue.put(session);
}
}
}
/** message consumer class **/
public class MessageConsumer implements Callable<String> {
private BlockingQueue queue;
private ExecutorService executorService;
public MessageConsumer(BlockingQueue<IoSession> queue) {
this.queue = queue;
Executors.newCachedThreadPool().submit(this);
}
public MessageConsumer(BlockingQueue<IoSession> queue,
ExecutorService executorService) {
this.queue = queue;
this.executorService = executorService;
this.executorService.submit(this);
}
private IoSession returnSession(BlockingQueue queue)
throws InterruptedException {
return (IoSession) queue.take();
}
public String call() throws Exception {
while (true) {
IoSession session = returnSession(queue);
// do some business
}
}
}
--
View this message in context:
http://www.nabble.com/unbindAll%28%29-would-close-the-IoSession--tf3990073.html#a11329861
Sent from the mina dev mailing list archive at Nabble.com.