[ 
https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073581#comment-13073581
 ] 

Emmanuel Lecharny edited comment on DIRMINA-845 at 9/5/14 10:53 AM:
--------------------------------------------------------------------

We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set 
"encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl 
present in ProtocolCodecFilter.filterWrite

In MINA 2.0.4 situation is a bit better - type cast to 
AbstractProtocolEncoderOutput. Of course, I can extend 
AbstractProtocolEncoderOutput for my implementation but anyway it looks like 
workaround not a regular way.

I suppose ProtocolCodecFilter.filterWrite should look like this
{code}
        // ...
        try {
            // Now we can try to encode the response
            encoder.encode(session, message, encoderOut);
            
            // Send it directly
            if (encoderOut instanceof Flushable) {
                ((Flushable)encoderOut).flush();
            }            
            
            // Call the next filter
            nextFilter.filterWrite(session, new MessageWriteRequest(
                    writeRequest));
        } catch (Throwable t) {
            // ...
        } 
{code}
and ProtocolCodecFilter.getEncoderOut
{code}
     protected ProtocolEncoderOutput getEncoderOut(IoSession session,
        NextFilter nextFilter, WriteRequest writeRequest) {
        // default implementation which might be overridden in derived classes
        ProtocolEncoderOutput out = (ProtocolEncoderOutput) 
session.getAttribute(ENCODER_OUT);
        
        if (out == null) {
            // Create a new instance, and stores it into the session
            out = new ProtocolEncoderOutputImpl(session, nextFilter, 
writeRequest);
            session.setAttribute(ENCODER_OUT, out);
        }
        
        return out;
    }
{code}

And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.

Emmanuel, early you wrote "We use a CLQ because you *may* have many messages 
written for i-one single session by many threads, if you add an executor in the 
chain."

I looked to Executors too. In case of OrderedThreadPoolExecutor all will work 
fine because it maintain events order per session, but if we insert 
UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in 
trouble as described in this issue. Of course, you may say "if you set 
unordered executor then events order doesn't matter for you and it is ok that 
flushing distort order too". But there is interesting aspect here. Messages 
which are processed by executor (before-encode) and messages are written to 
nextFilter via ProtocolEncoderOutput (after-encode) might have different 
meaning. Back to my example, before-encode message is a video frame and order 
for entire frames doesn't matter, after-encode message is a video frame chunks 
and order of chunks do matter!

So, I just want to say, when I write to ProtocolEncoderOutput I expect that 
next filter receives messages in the same order as they were written (may be 
mixed with other messages, but relative order should be preserved) but flush 
may distort this relative order!


I understand this example may look quite artificial and I'm not sure it should 
be fixed in default implementation but such behavior must be reflected in doc 
and must be a regular way to provide custom implementation of 
ProtocolEncoderOutput for such cases.


was (Author: ilya.a.ivanov):
We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set 
"encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl 
present in ProtocolCodecFilter.filterWrite

In MINA 2.0.4 situation is a bit better - type cast to 
AbstractProtocolEncoderOutput. Of course, I can extend 
AbstractProtocolEncoderOutput for my implementation but anyway it looks like 
workaround not a regular way.

I suppose ProtocolCodecFilter.filterWrite should look like this

        // ...
        try {
            // Now we can try to encode the response
            encoder.encode(session, message, encoderOut);
            
            // Send it directly
            if (encoderOut instanceof Flushable) {
                ((Flushable)encoderOut).flush();
            }            
            
            // Call the next filter
            nextFilter.filterWrite(session, new MessageWriteRequest(
                    writeRequest));
        } catch (Throwable t) {
            // ...
        } 

and ProtocolCodecFilter.getEncoderOut

     protected ProtocolEncoderOutput getEncoderOut(IoSession session,
        NextFilter nextFilter, WriteRequest writeRequest) {
        // default implementation which might be overridden in derived classes
        ProtocolEncoderOutput out = (ProtocolEncoderOutput) 
session.getAttribute(ENCODER_OUT);
        
        if (out == null) {
            // Create a new instance, and stores it into the session
            out = new ProtocolEncoderOutputImpl(session, nextFilter, 
writeRequest);
            session.setAttribute(ENCODER_OUT, out);
        }
        
        return out;
    }


And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.

Emmanuel, early you wrote "We use a CLQ because you *may* have many messages 
written for i-one single session by many threads, if you add an executor in the 
chain."

I looked to Executors too. In case of OrderedThreadPoolExecutor all will work 
fine because it maintain events order per session, but if we insert 
UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in 
trouble as described in this issue. Of course, you may say "if you set 
unordered executor then events order doesn't matter for you and it is ok that 
flushing distort order too". But there is interesting aspect here. Messages 
which are processed by executor (before-encode) and messages are written to 
nextFilter via ProtocolEncoderOutput (after-encode) might have different 
meaning. Back to my example, before-encode message is a video frame and order 
for entire frames doesn't matter, after-encode message is a video frame chunks 
and order of chunks do matter!

So, I just want to say, when I write to ProtocolEncoderOutput I expect that 
next filter receives messages in the same order as they were written (may be 
mixed with other messages, but relative order should be preserved) but flush 
may distort this relative order!


I understand this example may look quite artificial and I'm not sure it should 
be fixed in default implementation but such behavior must be reflected in doc 
and must be a regular way to provide custom implementation of 
ProtocolEncoderOutput for such cases.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>             Fix For: 2.0.8
>
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it 
> seems to be thread-safe. But really concurrent execution of flush method 
> isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According 
> protocol specification it's possible to write to different channels 
> concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of 
> flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but 
> ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
> encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new 
> EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any 
> explanation about such behavior.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to