Re: Does IoSession.write() do buffering?

2018-09-18 Thread Jonathan Valliere
There should be no aggregation, especially for Datagrams.

On Tue, Sep 18, 2018 at 1:57 PM Raghavendra Balgi  wrote:

> Hey Krishan, You should probably consider using a CodecFilter as described
> here -
>
> https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html
>
> On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar <
> kb00449...@techmahindra.com>
> wrote:
>
> > Hi All,
> >
> > We are working on an IoT project and using "org.apache.mina" library
> > (Version - 2.0.16) in our gateway adaptor (GA - Java Code).
> > Device connects to our GA and GA send acknowledgement back to device for
> > each message.
> > We are using multi-threading for handling each packet from a device(s).
> It
> > is also possible to have multiple messages in a single packet from
> device.
> > It would be based on some delimiter.
> >
> > E.g. Following packet is having 5 messages. Here delimiter is "Header"
> > HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5
> >
> > Now Device is expecting 5 Acknowledgements one at a time like given below
> > ACK1
> > ACK2
> > ACK3
> > ACK4
> > ACK5
> >
> > But we found following acknowledgements on device side. It is combining 2
> > or more ACKs sometimes.
> > ACK1ACK2
> > ACK3
> > ACK4ACK5
> >
> > Our code is given below. Acknowledgement code is highlighted. Kindly
> > review and let us know what are we doing wrong?
> > How does "session.write()" work? Will it not write immediately? I mean is
> > it using some buffer and appending multiple ACKs before sending message
> > back to device?
> >
> > Kindly suggest the solution.
> >
> >
> > public abstract class AbstractListener {
> > protected static final int BUFFER_SIZE = 1024;
> > public void init(String portKey) throws IOException {
> > AbstractIoAcceptor acceptor =
> > initInternal();
> > acceptor.setHandler(new
> RequestHandler());
> >
> > acceptor.getFilterChain().addLast("logger", new LoggingFilter());
> >
> > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
> >
> > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
> > if (acceptor instanceof
> > NioDatagramAcceptor) {
> > ((NioDatagramAcceptor)
> > acceptor).getSessionConfig().setReuseAddress(true);
> > }
> > acceptor.bind(new
> >
> InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey;
> > }
> > }
> >
> > public class RequestHandler extends IoHandlerAdapter {
> > public RequestHandler() {
> > super();
> > }
> >
> > @Override
> > public void exceptionCaught(IoSession session, Throwable
> > cause) throws Exception {
> > LOGGER.error("Exception caught in
> > RequestHandler.exceptionCaught()", cause);
> > session.closeNow();
> > }
> >
> > @Override
> > public void messageReceived(IoSession session, Object
> > message) throws Exception {
> > try {
> > SocketAddress
> > remoteAddress = session.getRemoteAddress();
> > String msg =
> > ((IoBuffer)message).getHexDump().replaceAll(" ", "");
> > String strArray[] =
> > remoteAddress.toString().split(":");
> >
> > ThreadAllocator.getInstance().allocateThread(session, msg);
> > } catch (Exception e) {
> > throw e;
> > }
> > }
> > }
> >
> > public class ThreadAllocator {
> >
> > /** The thread pool. */
> > private ExecutorService executorService = Executors
> >
> >
> .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));
> >
> > public void allocateThread(IoSession session, String
> > message) {
> > executorService.execute(new
> > HelperThread(null,session, message));
> > }
> > }
> >
> >
> > public class HelperThread implements Runnable {
> > private String message;
> > private IoSession session;
> > private SocketChannel socketSession;
> >
> > public HelperThread(SocketChannel socketSession,
> IoSession
> > session, String message) {
> > if (session != null) {
> > this.session = session;
> >
> > this.session.getConfig().setBothIdleTime(1800);
> > } else if (socketSession != null) {
> > 

Re: Does IoSession.write() do buffering?

2018-09-18 Thread Raghavendra Balgi
Hey Krishan, You should probably consider using a CodecFilter as described
here -
https://mina.apache.org/mina-project/userguide/ch9-codec-filter/ch9-codec-filter.html

On Tue, Sep 18, 2018 at 11:11 PM Krishan Babbar 
wrote:

> Hi All,
>
> We are working on an IoT project and using "org.apache.mina" library
> (Version - 2.0.16) in our gateway adaptor (GA - Java Code).
> Device connects to our GA and GA send acknowledgement back to device for
> each message.
> We are using multi-threading for handling each packet from a device(s). It
> is also possible to have multiple messages in a single packet from device.
> It would be based on some delimiter.
>
> E.g. Following packet is having 5 messages. Here delimiter is "Header"
> HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5
>
> Now Device is expecting 5 Acknowledgements one at a time like given below
> ACK1
> ACK2
> ACK3
> ACK4
> ACK5
>
> But we found following acknowledgements on device side. It is combining 2
> or more ACKs sometimes.
> ACK1ACK2
> ACK3
> ACK4ACK5
>
> Our code is given below. Acknowledgement code is highlighted. Kindly
> review and let us know what are we doing wrong?
> How does "session.write()" work? Will it not write immediately? I mean is
> it using some buffer and appending multiple ACKs before sending message
> back to device?
>
> Kindly suggest the solution.
>
>
> public abstract class AbstractListener {
> protected static final int BUFFER_SIZE = 1024;
> public void init(String portKey) throws IOException {
> AbstractIoAcceptor acceptor =
> initInternal();
> acceptor.setHandler(new RequestHandler());
>
> acceptor.getFilterChain().addLast("logger", new LoggingFilter());
>
> acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
>
> acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
> if (acceptor instanceof
> NioDatagramAcceptor) {
> ((NioDatagramAcceptor)
> acceptor).getSessionConfig().setReuseAddress(true);
> }
> acceptor.bind(new
> InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey;
> }
> }
>
> public class RequestHandler extends IoHandlerAdapter {
> public RequestHandler() {
> super();
> }
>
> @Override
> public void exceptionCaught(IoSession session, Throwable
> cause) throws Exception {
> LOGGER.error("Exception caught in
> RequestHandler.exceptionCaught()", cause);
> session.closeNow();
> }
>
> @Override
> public void messageReceived(IoSession session, Object
> message) throws Exception {
> try {
> SocketAddress
> remoteAddress = session.getRemoteAddress();
> String msg =
> ((IoBuffer)message).getHexDump().replaceAll(" ", "");
> String strArray[] =
> remoteAddress.toString().split(":");
>
> ThreadAllocator.getInstance().allocateThread(session, msg);
> } catch (Exception e) {
> throw e;
> }
> }
> }
>
> public class ThreadAllocator {
>
> /** The thread pool. */
> private ExecutorService executorService = Executors
>
> .newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));
>
> public void allocateThread(IoSession session, String
> message) {
> executorService.execute(new
> HelperThread(null,session, message));
> }
> }
>
>
> public class HelperThread implements Runnable {
> private String message;
> private IoSession session;
> private SocketChannel socketSession;
>
> public HelperThread(SocketChannel socketSession, IoSession
> session, String message) {
> if (session != null) {
> this.session = session;
>
> this.session.getConfig().setBothIdleTime(1800);
> } else if (socketSession != null) {
> this.socketSession =
> socketSession;
> }
> this.message = message;
> }
>
> public void run() {
> // E.g. message =
> "HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5"
>
> List multiMessage =
> handleMultipleMessage(message);
>  

Does IoSession.write() do buffering?

2018-09-18 Thread Krishan Babbar
Hi All,

We are working on an IoT project and using "org.apache.mina" library (Version - 
2.0.16) in our gateway adaptor (GA - Java Code).
Device connects to our GA and GA send acknowledgement back to device for each 
message.
We are using multi-threading for handling each packet from a device(s). It is 
also possible to have multiple messages in a single packet from device. It 
would be based on some delimiter.

E.g. Following packet is having 5 messages. Here delimiter is "Header"
HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5

Now Device is expecting 5 Acknowledgements one at a time like given below
ACK1
ACK2
ACK3
ACK4
ACK5

But we found following acknowledgements on device side. It is combining 2 or 
more ACKs sometimes.
ACK1ACK2
ACK3
ACK4ACK5

Our code is given below. Acknowledgement code is highlighted. Kindly review and 
let us know what are we doing wrong?
How does "session.write()" work? Will it not write immediately? I mean is it 
using some buffer and appending multiple ACKs before sending message back to 
device?

Kindly suggest the solution.


public abstract class AbstractListener {
protected static final int BUFFER_SIZE = 1024;
public void init(String portKey) throws IOException {
AbstractIoAcceptor acceptor = initInternal();
acceptor.setHandler(new RequestHandler());
acceptor.getFilterChain().addLast("logger", new 
LoggingFilter());

acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);

acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
if (acceptor instanceof NioDatagramAcceptor) {
((NioDatagramAcceptor) 
acceptor).getSessionConfig().setReuseAddress(true);
}
acceptor.bind(new 
InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey;
}
}

public class RequestHandler extends IoHandlerAdapter {
public RequestHandler() {
super();
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) 
throws Exception {
LOGGER.error("Exception caught in 
RequestHandler.exceptionCaught()", cause);
session.closeNow();
}

@Override
public void messageReceived(IoSession session, Object message) 
throws Exception {
try {
SocketAddress remoteAddress = 
session.getRemoteAddress();
String msg = 
((IoBuffer)message).getHexDump().replaceAll(" ", "");
String strArray[] = 
remoteAddress.toString().split(":");

ThreadAllocator.getInstance().allocateThread(session, msg);
} catch (Exception e) {
throw e;
}
}
}

public class ThreadAllocator {

/** The thread pool. */
private ExecutorService executorService = Executors

.newWorkStealingPool(Integer.parseInt(ServerConfigLoader.getInstance().getProperty("threadPoolSize")));

public void allocateThread(IoSession session, String message) {
executorService.execute(new 
HelperThread(null,session, message));
}
}


public class HelperThread implements Runnable {
private String message;
private IoSession session;
private SocketChannel socketSession;

public HelperThread(SocketChannel socketSession, IoSession 
session, String message) {
if (session != null) {
this.session = session;

this.session.getConfig().setBothIdleTime(1800);
} else if (socketSession != null) {
this.socketSession = 
socketSession;
}
this.message = message;
}

public void run() {
// E.g. message = 
"HeaderMsg1HeaderMsg2HeaderMsg3HeaderMsg4HeaderMsg5"

List multiMessage = 
handleMultipleMessage(message);
for (String singleMessage : multiMessage) {
byte[] response = 
AckBuilderFactory.getBuilderFactory(singleMessage)