Hi Folks,
 
How can I process multiple xml documents written as part of one write 
operation?  This is where I am stuck now. When the decoder gets more than one 
xml document as part of an IoSession.doDecode (onceI modify the 
isValidMessage() to seperate xml documents) 
how can I process the completed xml document using a seperate threadpool while 
continue decoding the subsequent documents from ByteBuffer? Any examples or 
suggestions would be truely appreciated.
 
 
package client;
import codec.XmlMessage;
public class XmlExchangeClient
{
    public static final String XML_HOST_NAME  = "localhost";
    public static final int    XML_HOST_PORT  = 23232;
    public void onException(Throwable throwable)
    {
        Throwable cause = throwable;
        while (cause.getCause() != null)
        {
            cause = cause.getCause();
        }
        System.out.println(cause.getMessage());
    }
    public static String getXml(boolean isLargeSize)
    {
        StringBuffer sbXml = new StringBuffer("<?xml version='1.0' 
encoding='UTF-8' standalone='no'?>");
        sbXml.append("<Message>");
        sbXml.append(" <Element1>0301</Element1>");
        sbXml.append(" <Element2>1</Element2>");
        sbXml.append(" <Element3>934001</Element3>");
        sbXml.append(" <DateTime>20080926135416</DateTime>");
        sbXml.append(" <Element4>ABCDEF</Element4>");
        sbXml.append(" 
<Element5><Element5a>02</Element5a><Element5b>500.00</Element5b></Element5>");
        sbXml.append(" 
<Element6><Element6a>HSG</Element6a><Element6b>XDS</Element6b></Element6>");
        sbXml.append(" 
<Element7><Element7a>100</Element7a><Element7b>3.295</Element7b><Element5b>3.35</Element5b></Element7>");
        if (isLargeSize)
        {
            for (int i=101; i <105; i++)
            {
                sbXml.append(" 
<Element7><Element7a>"+i+"</Element7a><Element7b>3.295</Element7b><Element5b>3.35</Element5b></Element7>");
            }
        }
        sbXml.append("</Message>");
        System.out.println(sbXml.toString());
        return sbXml.toString();
    }
    /**
     * @param args the command line arguments
     */
    public static void main(String args[]) throws Exception
    {
        XmlExchangeClientIoHandler xmlClient = new 
XmlExchangeClientIoHandler(XML_HOST_NAME, XML_HOST_PORT);
        xmlClient.connect();
        //xmlClient.sendRequest(new XmlMessage(getXml(true)+getXml(false)));
        xmlClient.sendRequest(new XmlMessage(getXml(true)));
    }
}
 
package client;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.filter.LoggingFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
import java.net.InetSocketAddress;
import codec.XmlCodecFactory;
import codec.XmlMessage;
public class XmlExchangeClientIoHandler extends IoHandlerAdapter
{
    public static final int    CONNECTION_TIMEOUT = 3000;
    private String host;
    private int port;
    private SocketConnector connector;
    private IoSession session;

    public XmlExchangeClientIoHandler(String host, int port)
    {
        this.host = host;
        this.port = port;
        connector = new SocketConnector();
    }
    public void connect()
    {
        ConnectFuture connectFuture = connector.connect(new 
InetSocketAddress(host, port), this);
        connectFuture.join(CONNECTION_TIMEOUT);
        try
        {
            session = connectFuture.getSession();
        }
        catch (RuntimeIOException e)
        {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
    }
    public void disconnect() {
        if (session != null)
        {
            session.close().join(CONNECTION_TIMEOUT);
            session = null;
        }
    }
   public void sessionCreated(IoSession session) throws Exception
   {
        if (session.getTransportType() == TransportType.SOCKET)
            ((SocketSessionConfig) session.getConfig() 
).setReceiveBufferSize(2048);
        session.setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        session.getFilterChain().addLast("protocolFilter", new 
ProtocolCodecFilter(new XmlCodecFactory(false)));
        //session.getFilterChain().addLast("logger", new LoggingFilter());
   }
    public void sendRequest(XmlMessage xmlMessage)
    {
        if (session == null)
        {
            System.out.println("NULL Session");
        }
        else
        {
            System.out.println("sendRequest:"+xmlMessage);
            session.write(xmlMessage);
        }
    }
    public void messageReceived(IoSession session, Object message) throws 
Exception
    {
        XmlMessage xmlMessage = (XmlMessage) message;
        System.out.println("Response>>> "+xmlMessage);
    }
    public void exceptionCaught(IoSession session, Throwable cause) throws 
Exception
    {
        System.out.println(cause.getMessage());
    }
}

package server;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportType;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import codec.XmlMessage;
import codec.XmlCodecFactory;
public class XmlExchangeServerIoHandler extends IoHandlerAdapter
{
    private static Logger logger = 
LoggerFactory.getLogger(XmlExchangeServerIoHandler.class);
    public void sessionCreated(IoSession session) throws Exception
    {
        if (session.getTransportType() == TransportType.SOCKET)
            ((SocketSessionConfig) session.getConfig() 
).setReceiveBufferSize(2048);
        session.setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        session.getFilterChain().addLast("protocolFilter", new 
ProtocolCodecFilter(new XmlCodecFactory(false)));
        //session.getFilterChain().addLast("logger", new LoggingFilter());
    }
    public void sessionOpened(IoSession session) throws Exception
    {
        session.setAttribute("XML_MESSAGE_OBJECT", new XmlMessage());
    }
    public void exceptionCaught(IoSession session, Throwable cause) throws 
Exception
    {
        System.out.println(cause.getMessage());
        //SessionLog.warn(session, cause.getMessage(), cause);
    }
    public void messageReceived(IoSession session, Object message) throws 
Exception
    {
        XmlMessage xmlMessage  = (XmlMessage) message;
        System.out.println("Request>>> "+xmlMessage);
        // Use JibX to convery xml to java and vice versa.

        session.write(xmlMessage);
    }
}

 
package server;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import java.net.InetSocketAddress;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class XmlExchangeServer
{
  private static final Logger logger = 
LoggerFactory.getLogger(XmlExchangeServer.class);
    public static final String XML_HOST_NAME  = "localhost";
    public static final int    XML_HOST_PORT  = 23232;
    public static final int    CONNECTION_TIMEOUT = 3000;
  public static void main(String[] args) throws IOException
  {
      ByteBuffer.setUseDirectBuffers(false);
      ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
      IoAcceptor acceptor = new SocketAcceptor();
      acceptor.bind(new InetSocketAddress(XML_HOST_PORT), new 
XmlExchangeServerIoHandler());
      System.out.println("server is listenig at port " + XML_HOST_PORT);
  }
}

 
package codec;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import codec.XmlMessage;
public class XmlMessageEncoder implements ProtocolEncoder
{
    private static Logger logger = 
LoggerFactory.getLogger(XmlMessageEncoder.class);
    public void encode(IoSession session, Object message, ProtocolEncoderOutput 
encoderOutput) throws Exception
    {
        System.out.println( session.toString() + message);
        String xmlMessage = ((XmlMessage) message).getMessage();
        byte[] bytes = xmlMessage.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(xmlMessage.length(), true);
        buffer.put(bytes);
        buffer.flip();
        encoderOutput.write(buffer);
  }
  public void dispose(IoSession session) throws Exception {
    // nothing to dispose
  }
}

 
package codec;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import codec.XmlMessage;
public class XmlMessageDecoder extends CumulativeProtocolDecoder
{
    private static Logger logger = 
LoggerFactory.getLogger(XmlMessageDecoder.class);
    @Override
    protected boolean doDecode(IoSession session, ByteBuffer byteBuffer, 
ProtocolDecoderOutput decoderOutput) throws Exception
    {
        boolean isMessageCompleted = false;
        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
        
System.out.println("byteBuffer.hasRemaining():"+byteBuffer.hasRemaining());
        if (byteBuffer.hasRemaining())
        {
            String requestMsg = byteBuffer.getString(decoder);
            XmlMessage xmlMessage = 
(XmlMessage)session.getAttribute("XML_MESSAGE_OBJECT");
            if (xmlMessage == null)
                xmlMessage = new XmlMessage();
            System.out.println("requestMsg: "+requestMsg);
            System.out.println("xmlMessage: "+xmlMessage);
            xmlMessage.setMessage(requestMsg);
            session.setAttribute("XML_MESSAGE_OBJECT", xmlMessage);
            if (xmlMessage.isValidMessage())
            {
                decoderOutput.write(xmlMessage);
                System.out.println("Message completed...");
                isMessageCompleted = true;
            }
            else
            {
                System.out.println("Message NOT completed...");
            }
        }
        else
        {
            System.out.println("NO bytes to read keep waiting...");
        }
        return isMessageCompleted;
    }
}
 
 
 
package codec;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import codec.XmlMessageDecoder;
public class XmlMessage implements Serializable
{
    private static final long serialVersionUID = 1L;
    private static Logger logger = 
LoggerFactory.getLogger(XmlMessageDecoder.class);
    private StringBuffer message = new StringBuffer();
    public XmlMessage()
    {
    }
    public XmlMessage(String msg)
    {
        setMessage(msg);
    }
    public void setMessage(String val) { if (val != null) message.append(val); }
    public String getMessage()    { return (message == null) ? "" : 
message.toString().trim(); }
    public int getMessageLength() { return (message == null) ? 0 : 
message.toString().trim().length(); }
    public boolean isValidMessage()
    {
        boolean isValidMessage = false;
        if (getMessageLength() > 0)
        {
            String msg = getMessage();
            boolean isEndsWithRootTag    = msg.endsWith("</Message>");
            int     rootBeginTagPosition = msg.lastIndexOf("<Message>");
            int     rootEndTagPosition   = msg.indexOf("</Message>");
            System.out.println("msg="+msg);
            System.out.println("isEndsWithRootTag ["+isEndsWithRootTag +"]   
rootBeginTagPosition ["+rootBeginTagPosition+"]     rootEndTagPosition 
["+rootEndTagPosition+"]");
            if (isEndsWithRootTag && (rootEndTagPosition > 
rootBeginTagPosition))
                isValidMessage = true;
        }
        return isValidMessage;
    }
    public String toString()      { return getMessage(); }
}

 
package codec;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolDecoder;
public class XmlCodecFactory implements ProtocolCodecFactory
{
  private ProtocolEncoder encoder;
  private ProtocolDecoder decoder;
  public XmlCodecFactory(boolean client)
  {
    if (client)
    {
        encoder = new XmlMessageEncoder();
        decoder = new XmlMessageDecoder();
    }
    else
    {
        encoder = new XmlMessageEncoder();
        decoder = new XmlMessageDecoder();
    }
  }
  public ProtocolEncoder getEncoder() throws Exception
  {
    return encoder;
  }
  public ProtocolDecoder getDecoder() throws Exception
  {
    return decoder;
  }
}

 
 
 
 
 
 
 


      

Reply via email to