Hi Folks, How can I process multiple xml documents written as part of one write operation? This is where I am stuck now. When the decoder gets more than one xml document as part of an IoSession.doDecode (onceI modify the isValidMessage() to seperate xml documents) how can I process the completed xml document using a seperate threadpool while continue decoding the subsequent documents from ByteBuffer? Any examples or suggestions would be truely appreciated. package client; import codec.XmlMessage; public class XmlExchangeClient { public static final String XML_HOST_NAME = "localhost"; public static final int XML_HOST_PORT = 23232; public void onException(Throwable throwable) { Throwable cause = throwable; while (cause.getCause() != null) { cause = cause.getCause(); } System.out.println(cause.getMessage()); } public static String getXml(boolean isLargeSize) { StringBuffer sbXml = new StringBuffer("<?xml version='1.0' encoding='UTF-8' standalone='no'?>"); sbXml.append("<Message>"); sbXml.append(" <Element1>0301</Element1>"); sbXml.append(" <Element2>1</Element2>"); sbXml.append(" <Element3>934001</Element3>"); sbXml.append(" <DateTime>20080926135416</DateTime>"); sbXml.append(" <Element4>ABCDEF</Element4>"); sbXml.append(" <Element5><Element5a>02</Element5a><Element5b>500.00</Element5b></Element5>"); sbXml.append(" <Element6><Element6a>HSG</Element6a><Element6b>XDS</Element6b></Element6>"); sbXml.append(" <Element7><Element7a>100</Element7a><Element7b>3.295</Element7b><Element5b>3.35</Element5b></Element7>"); if (isLargeSize) { for (int i=101; i <105; i++) { sbXml.append(" <Element7><Element7a>"+i+"</Element7a><Element7b>3.295</Element7b><Element5b>3.35</Element5b></Element7>"); } } sbXml.append("</Message>"); System.out.println(sbXml.toString()); return sbXml.toString(); } /** * @param args the command line arguments */ public static void main(String args[]) throws Exception { XmlExchangeClientIoHandler xmlClient = new XmlExchangeClientIoHandler(XML_HOST_NAME, XML_HOST_PORT); xmlClient.connect(); //xmlClient.sendRequest(new XmlMessage(getXml(true)+getXml(false))); xmlClient.sendRequest(new XmlMessage(getXml(true))); } } package client; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.filter.LoggingFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.RuntimeIOException; import org.apache.mina.common.TransportType; import java.net.InetSocketAddress; import codec.XmlCodecFactory; import codec.XmlMessage; public class XmlExchangeClientIoHandler extends IoHandlerAdapter { public static final int CONNECTION_TIMEOUT = 3000; private String host; private int port; private SocketConnector connector; private IoSession session;
public XmlExchangeClientIoHandler(String host, int port) { this.host = host; this.port = port; connector = new SocketConnector(); } public void connect() { ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port), this); connectFuture.join(CONNECTION_TIMEOUT); try { session = connectFuture.getSession(); } catch (RuntimeIOException e) { System.out.println(e.getMessage()); e.printStackTrace(); } } public void disconnect() { if (session != null) { session.close().join(CONNECTION_TIMEOUT); session = null; } } public void sessionCreated(IoSession session) throws Exception { if (session.getTransportType() == TransportType.SOCKET) ((SocketSessionConfig) session.getConfig() ).setReceiveBufferSize(2048); session.setIdleTime( IdleStatus.BOTH_IDLE, 10 ); session.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new XmlCodecFactory(false))); //session.getFilterChain().addLast("logger", new LoggingFilter()); } public void sendRequest(XmlMessage xmlMessage) { if (session == null) { System.out.println("NULL Session"); } else { System.out.println("sendRequest:"+xmlMessage); session.write(xmlMessage); } } public void messageReceived(IoSession session, Object message) throws Exception { XmlMessage xmlMessage = (XmlMessage) message; System.out.println("Response>>> "+xmlMessage); } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println(cause.getMessage()); } } package server; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.common.TransportType; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import codec.XmlMessage; import codec.XmlCodecFactory; public class XmlExchangeServerIoHandler extends IoHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(XmlExchangeServerIoHandler.class); public void sessionCreated(IoSession session) throws Exception { if (session.getTransportType() == TransportType.SOCKET) ((SocketSessionConfig) session.getConfig() ).setReceiveBufferSize(2048); session.setIdleTime( IdleStatus.BOTH_IDLE, 10 ); session.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new XmlCodecFactory(false))); //session.getFilterChain().addLast("logger", new LoggingFilter()); } public void sessionOpened(IoSession session) throws Exception { session.setAttribute("XML_MESSAGE_OBJECT", new XmlMessage()); } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println(cause.getMessage()); //SessionLog.warn(session, cause.getMessage(), cause); } public void messageReceived(IoSession session, Object message) throws Exception { XmlMessage xmlMessage = (XmlMessage) message; System.out.println("Request>>> "+xmlMessage); // Use JibX to convery xml to java and vice versa. session.write(xmlMessage); } } package server; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptor; import java.net.InetSocketAddress; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class XmlExchangeServer { private static final Logger logger = LoggerFactory.getLogger(XmlExchangeServer.class); public static final String XML_HOST_NAME = "localhost"; public static final int XML_HOST_PORT = 23232; public static final int CONNECTION_TIMEOUT = 3000; public static void main(String[] args) throws IOException { ByteBuffer.setUseDirectBuffers(false); ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(new InetSocketAddress(XML_HOST_PORT), new XmlExchangeServerIoHandler()); System.out.println("server is listenig at port " + XML_HOST_PORT); } } package codec; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.common.IoSession; import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import codec.XmlMessage; public class XmlMessageEncoder implements ProtocolEncoder { private static Logger logger = LoggerFactory.getLogger(XmlMessageEncoder.class); public void encode(IoSession session, Object message, ProtocolEncoderOutput encoderOutput) throws Exception { System.out.println( session.toString() + message); String xmlMessage = ((XmlMessage) message).getMessage(); byte[] bytes = xmlMessage.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(xmlMessage.length(), true); buffer.put(bytes); buffer.flip(); encoderOutput.write(buffer); } public void dispose(IoSession session) throws Exception { // nothing to dispose } } package codec; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import codec.XmlMessage; public class XmlMessageDecoder extends CumulativeProtocolDecoder { private static Logger logger = LoggerFactory.getLogger(XmlMessageDecoder.class); @Override protected boolean doDecode(IoSession session, ByteBuffer byteBuffer, ProtocolDecoderOutput decoderOutput) throws Exception { boolean isMessageCompleted = false; CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); System.out.println("byteBuffer.hasRemaining():"+byteBuffer.hasRemaining()); if (byteBuffer.hasRemaining()) { String requestMsg = byteBuffer.getString(decoder); XmlMessage xmlMessage = (XmlMessage)session.getAttribute("XML_MESSAGE_OBJECT"); if (xmlMessage == null) xmlMessage = new XmlMessage(); System.out.println("requestMsg: "+requestMsg); System.out.println("xmlMessage: "+xmlMessage); xmlMessage.setMessage(requestMsg); session.setAttribute("XML_MESSAGE_OBJECT", xmlMessage); if (xmlMessage.isValidMessage()) { decoderOutput.write(xmlMessage); System.out.println("Message completed..."); isMessageCompleted = true; } else { System.out.println("Message NOT completed..."); } } else { System.out.println("NO bytes to read keep waiting..."); } return isMessageCompleted; } } package codec; import java.io.Serializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import codec.XmlMessageDecoder; public class XmlMessage implements Serializable { private static final long serialVersionUID = 1L; private static Logger logger = LoggerFactory.getLogger(XmlMessageDecoder.class); private StringBuffer message = new StringBuffer(); public XmlMessage() { } public XmlMessage(String msg) { setMessage(msg); } public void setMessage(String val) { if (val != null) message.append(val); } public String getMessage() { return (message == null) ? "" : message.toString().trim(); } public int getMessageLength() { return (message == null) ? 0 : message.toString().trim().length(); } public boolean isValidMessage() { boolean isValidMessage = false; if (getMessageLength() > 0) { String msg = getMessage(); boolean isEndsWithRootTag = msg.endsWith("</Message>"); int rootBeginTagPosition = msg.lastIndexOf("<Message>"); int rootEndTagPosition = msg.indexOf("</Message>"); System.out.println("msg="+msg); System.out.println("isEndsWithRootTag ["+isEndsWithRootTag +"] rootBeginTagPosition ["+rootBeginTagPosition+"] rootEndTagPosition ["+rootEndTagPosition+"]"); if (isEndsWithRootTag && (rootEndTagPosition > rootBeginTagPosition)) isValidMessage = true; } return isValidMessage; } public String toString() { return getMessage(); } } package codec; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolDecoder; public class XmlCodecFactory implements ProtocolCodecFactory { private ProtocolEncoder encoder; private ProtocolDecoder decoder; public XmlCodecFactory(boolean client) { if (client) { encoder = new XmlMessageEncoder(); decoder = new XmlMessageDecoder(); } else { encoder = new XmlMessageEncoder(); decoder = new XmlMessageDecoder(); } } public ProtocolEncoder getEncoder() throws Exception { return encoder; } public ProtocolDecoder getDecoder() throws Exception { return decoder; } }