Re: Does IoSession.write() do buffering?
There should be no aggregation, especially for Datagrams. On Tue, Sep 18, 2018 at 1:57 PM Raghavendra Balgi wrote: > Hey Krishan, You should probably consider using a CodecFilter as described > here - > > https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html > > On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar < > kb00449...@techmahindra.com> > wrote: > > > Hi All, > > > > We are working on an IoT project and using "org.apache.mina" library > > (Version - 2.0.16) in our gateway adaptor (GA - Java Code). > > Device connects to our GA and GA send acknowledgement back to device for > > each message. > > We are using multi-threading for handling each packet from a device(s). > It > > is also possible to have multiple messages in a single packet from > device. > > It would be based on some delimiter. > > > > E.g. Following packet is having 5 messages. Here delimiter is "Header" > > HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5 > > > > Now Device is expecting 5 Acknowledgements one at a time like given below > > ACK1 > > ACK2 > > ACK3 > > ACK4 > > ACK5 > > > > But we found following acknowledgements on device side. It is combining 2 > > or more ACKs sometimes. > > ACK1ACK2 > > ACK3 > > ACK4ACK5 > > > > Our code is given below. Acknowledgement code is highlighted. Kindly > > review and let us know what are we doing wrong? > > How does "session.write()" work? Will it not write immediately? I mean is > > it using some buffer and appending multiple ACKs before sending message > > back to device? > > > > Kindly suggest the solution. > > > > > > public abstract class AbstractListener { > > protected static final int BUFFER_SIZE = 1024; > > public void init(String portKey) throws IOException { > > AbstractIoAcceptor acceptor = > > initInternal(); > > acceptor.setHandler(new > RequestHandler()); > > > > acceptor.getFilterChain().addLast("logger", new LoggingFilter()); > > > > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); > > > > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); > > if (acceptor instanceof > > NioDatagramAcceptor) { > > ((NioDatagramAcceptor) > > acceptor).getSessionConfig().setReuseAddress(true); > > } > > acceptor.bind(new > > > InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey; > > } > > } > > > > public class RequestHandler extends IoHandlerAdapter { > > public RequestHandler() { > > super(); > > } > > > > @Override > > public void exceptionCaught(IoSession session, Throwable > > cause) throws Exception { > > LOGGER.error("Exception caught in > > RequestHandler.exceptionCaught()", cause); > > session.closeNow(); > > } > > > > @Override > > public void messageReceived(IoSession session, Object > > message) throws Exception { > > try { > > SocketAddress > > remoteAddress = session.getRemoteAddress(); > > String msg = > > ((IoBuffer)message).getHexDump().replaceAll(" ", ""); > > String strArray[] = > > remoteAddress.toString().split(":"); > > > > ThreadAllocator.getInstance().allocateThread(session, msg); > > } catch (Exception e) { > > throw e; > > } > > } > > } > > > > public class ThreadAllocator { > > > > /** The thread pool. */ > > private ExecutorService executorService = Executors > > > > > .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize"))); > > > > public void allocateThread(IoSession session, String > > message) { > > executorService.execute(new > > HelperThread(null,session, message)); > > } > > } > > > > > > public class HelperThread implements Runnable { > > private String message; > > private IoSession session; > > private SocketChannel socketSession; > > > > public HelperThread(SocketChannel socketSession, > IoSession > > session, String message) { > > if (session != null) { > > this.session = session; > > > > this.session.getConfig().setBothIdleTime(1800); > > } else if (socketSession != null) { > >
Re: Does IoSession.write() do buffering?
Hey Krishan, You should probably consider using a CodecFilter as described here - https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar wrote: > Hi All, > > We are working on an IoT project and using "org.apache.mina" library > (Version - 2.0.16) in our gateway adaptor (GA - Java Code). > Device connects to our GA and GA send acknowledgement back to device for > each message. > We are using multi-threading for handling each packet from a device(s). It > is also possible to have multiple messages in a single packet from device. > It would be based on some delimiter. > > E.g. Following packet is having 5 messages. Here delimiter is "Header" > HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5 > > Now Device is expecting 5 Acknowledgements one at a time like given below > ACK1 > ACK2 > ACK3 > ACK4 > ACK5 > > But we found following acknowledgements on device side. It is combining 2 > or more ACKs sometimes. > ACK1ACK2 > ACK3 > ACK4ACK5 > > Our code is given below. Acknowledgement code is highlighted. Kindly > review and let us know what are we doing wrong? > How does "session.write()" work? Will it not write immediately? I mean is > it using some buffer and appending multiple ACKs before sending message > back to device? > > Kindly suggest the solution. > > > public abstract class AbstractListener { > protected static final int BUFFER_SIZE = 1024; > public void init(String portKey) throws IOException { > AbstractIoAcceptor acceptor = > initInternal(); > acceptor.setHandler(new RequestHandler()); > > acceptor.getFilterChain().addLast("logger", new LoggingFilter()); > > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); > > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); > if (acceptor instanceof > NioDatagramAcceptor) { > ((NioDatagramAcceptor) > acceptor).getSessionConfig().setReuseAddress(true); > } > acceptor.bind(new > InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey; > } > } > > public class RequestHandler extends IoHandlerAdapter { > public RequestHandler() { > super(); > } > > @Override > public void exceptionCaught(IoSession session, Throwable > cause) throws Exception { > LOGGER.error("Exception caught in > RequestHandler.exceptionCaught()", cause); > session.closeNow(); > } > > @Override > public void messageReceived(IoSession session, Object > message) throws Exception { > try { > SocketAddress > remoteAddress = session.getRemoteAddress(); > String msg = > ((IoBuffer)message).getHexDump().replaceAll(" ", ""); > String strArray[] = > remoteAddress.toString().split(":"); > > ThreadAllocator.getInstance().allocateThread(session, msg); > } catch (Exception e) { > throw e; > } > } > } > > public class ThreadAllocator { > > /** The thread pool. */ > private ExecutorService executorService = Executors > > .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize"))); > > public void allocateThread(IoSession session, String > message) { > executorService.execute(new > HelperThread(null,session, message)); > } > } > > > public class HelperThread implements Runnable { > private String message; > private IoSession session; > private SocketChannel socketSession; > > public HelperThread(SocketChannel socketSession, IoSession > session, String message) { > if (session != null) { > this.session = session; > > this.session.getConfig().setBothIdleTime(1800); > } else if (socketSession != null) { > this.socketSession = > socketSession; > } > this.message = message; > } > > public void run() { > // E.g. message = > "HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5" > > List multiMessage = > handleMultipleMessage(message); >
Does IoSession.write() do buffering?
Hi All, We are working on an IoT project and using "org.apache.mina" library (Version - 2.0.16) in our gateway adaptor (GA - Java Code). Device connects to our GA and GA send acknowledgement back to device for each message. We are using multi-threading for handling each packet from a device(s). It is also possible to have multiple messages in a single packet from device. It would be based on some delimiter. E.g. Following packet is having 5 messages. Here delimiter is "Header" HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5 Now Device is expecting 5 Acknowledgements one at a time like given below ACK1 ACK2 ACK3 ACK4 ACK5 But we found following acknowledgements on device side. It is combining 2 or more ACKs sometimes. ACK1ACK2 ACK3 ACK4ACK5 Our code is given below. Acknowledgement code is highlighted. Kindly review and let us know what are we doing wrong? How does "session.write()" work? Will it not write immediately? I mean is it using some buffer and appending multiple ACKs before sending message back to device? Kindly suggest the solution. public abstract class AbstractListener { protected static final int BUFFER_SIZE = 1024; public void init(String portKey) throws IOException { AbstractIoAcceptor acceptor = initInternal(); acceptor.setHandler(new RequestHandler()); acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); if (acceptor instanceof NioDatagramAcceptor) { ((NioDatagramAcceptor) acceptor).getSessionConfig().setReuseAddress(true); } acceptor.bind(new InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey; } } public class RequestHandler extends IoHandlerAdapter { public RequestHandler() { super(); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { LOGGER.error("Exception caught in RequestHandler.exceptionCaught()", cause); session.closeNow(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { try { SocketAddress remoteAddress = session.getRemoteAddress(); String msg = ((IoBuffer)message).getHexDump().replaceAll(" ", ""); String strArray[] = remoteAddress.toString().split(":"); ThreadAllocator.getInstance().allocateThread(session, msg); } catch (Exception e) { throw e; } } } public class ThreadAllocator { /** The thread pool. */ private ExecutorService executorService = Executors .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize"))); public void allocateThread(IoSession session, String message) { executorService.execute(new HelperThread(null,session, message)); } } public class HelperThread implements Runnable { private String message; private IoSession session; private SocketChannel socketSession; public HelperThread(SocketChannel socketSession, IoSession session, String message) { if (session != null) { this.session = session; this.session.getConfig().setBothIdleTime(1800); } else if (socketSession != null) { this.socketSession = socketSession; } this.message = message; } public void run() { // E.g. message = "HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5" List multiMessage = handleMultipleMessage(message); for (String singleMessage : multiMessage) { byte[] response = AckBuilderFactory.getBuilderFactory(singleMessage)