[
https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073581#comment-13073581
]
Emmanuel Lecharny edited comment on DIRMINA-845 at 9/5/14 10:53 AM:
--------------------------------------------------------------------
We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set
"encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl
present in ProtocolCodecFilter.filterWrite
In MINA 2.0.4 situation is a bit better - type cast to
AbstractProtocolEncoderOutput. Of course, I can extend
AbstractProtocolEncoderOutput for my implementation but anyway it looks like
workaround not a regular way.
I suppose ProtocolCodecFilter.filterWrite should look like this
{code}
// ...
try {
// Now we can try to encode the response
encoder.encode(session, message, encoderOut);
// Send it directly
if (encoderOut instanceof Flushable) {
((Flushable)encoderOut).flush();
}
// Call the next filter
nextFilter.filterWrite(session, new MessageWriteRequest(
writeRequest));
} catch (Throwable t) {
// ...
}
{code}
and ProtocolCodecFilter.getEncoderOut
{code}
protected ProtocolEncoderOutput getEncoderOut(IoSession session,
NextFilter nextFilter, WriteRequest writeRequest) {
// default implementation which might be overridden in derived classes
ProtocolEncoderOutput out = (ProtocolEncoderOutput)
session.getAttribute(ENCODER_OUT);
if (out == null) {
// Create a new instance, and stores it into the session
out = new ProtocolEncoderOutputImpl(session, nextFilter,
writeRequest);
session.setAttribute(ENCODER_OUT, out);
}
return out;
}
{code}
And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.
Emmanuel, early you wrote "We use a CLQ because you *may* have many messages
written for i-one single session by many threads, if you add an executor in the
chain."
I looked to Executors too. In case of OrderedThreadPoolExecutor all will work
fine because it maintain events order per session, but if we insert
UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in
trouble as described in this issue. Of course, you may say "if you set
unordered executor then events order doesn't matter for you and it is ok that
flushing distort order too". But there is interesting aspect here. Messages
which are processed by executor (before-encode) and messages are written to
nextFilter via ProtocolEncoderOutput (after-encode) might have different
meaning. Back to my example, before-encode message is a video frame and order
for entire frames doesn't matter, after-encode message is a video frame chunks
and order of chunks do matter!
So, I just want to say, when I write to ProtocolEncoderOutput I expect that
next filter receives messages in the same order as they were written (may be
mixed with other messages, but relative order should be preserved) but flush
may distort this relative order!
I understand this example may look quite artificial and I'm not sure it should
be fixed in default implementation but such behavior must be reflected in doc
and must be a regular way to provide custom implementation of
ProtocolEncoderOutput for such cases.
was (Author: ilya.a.ivanov):
We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set
"encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl
present in ProtocolCodecFilter.filterWrite
In MINA 2.0.4 situation is a bit better - type cast to
AbstractProtocolEncoderOutput. Of course, I can extend
AbstractProtocolEncoderOutput for my implementation but anyway it looks like
workaround not a regular way.
I suppose ProtocolCodecFilter.filterWrite should look like this
// ...
try {
// Now we can try to encode the response
encoder.encode(session, message, encoderOut);
// Send it directly
if (encoderOut instanceof Flushable) {
((Flushable)encoderOut).flush();
}
// Call the next filter
nextFilter.filterWrite(session, new MessageWriteRequest(
writeRequest));
} catch (Throwable t) {
// ...
}
and ProtocolCodecFilter.getEncoderOut
protected ProtocolEncoderOutput getEncoderOut(IoSession session,
NextFilter nextFilter, WriteRequest writeRequest) {
// default implementation which might be overridden in derived classes
ProtocolEncoderOutput out = (ProtocolEncoderOutput)
session.getAttribute(ENCODER_OUT);
if (out == null) {
// Create a new instance, and stores it into the session
out = new ProtocolEncoderOutputImpl(session, nextFilter,
writeRequest);
session.setAttribute(ENCODER_OUT, out);
}
return out;
}
And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.
Emmanuel, early you wrote "We use a CLQ because you *may* have many messages
written for i-one single session by many threads, if you add an executor in the
chain."
I looked to Executors too. In case of OrderedThreadPoolExecutor all will work
fine because it maintain events order per session, but if we insert
UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in
trouble as described in this issue. Of course, you may say "if you set
unordered executor then events order doesn't matter for you and it is ok that
flushing distort order too". But there is interesting aspect here. Messages
which are processed by executor (before-encode) and messages are written to
nextFilter via ProtocolEncoderOutput (after-encode) might have different
meaning. Back to my example, before-encode message is a video frame and order
for entire frames doesn't matter, after-encode message is a video frame chunks
and order of chunks do matter!
So, I just want to say, when I write to ProtocolEncoderOutput I expect that
next filter receives messages in the same order as they were written (may be
mixed with other messages, but relative order should be preserved) but flush
may distort this relative order!
I understand this example may look quite artificial and I'm not sure it should
be fixed in default implementation but such behavior must be reflected in doc
and must be a regular way to provide custom implementation of
ProtocolEncoderOutput for such cases.
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
> Key: DIRMINA-845
> URL: https://issues.apache.org/jira/browse/DIRMINA-845
> Project: MINA
> Issue Type: Bug
> Components: Filter
> Affects Versions: 2.0.4
> Reporter: Ilya Ivanov
> Fix For: 2.0.8
>
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it
> seems to be thread-safe. But really concurrent execution of flush method
> isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According
> protocol specification it's possible to write to different channels
> concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of
> flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but
> ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
> Object encodedMessage = bufferQueue.poll();
>
> if (encodedMessage == null) {
> break;
> }
> // Flush only when the buffer has remaining.
> if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer)
> encodedMessage).hasRemaining()) {
> SocketAddress destination = writeRequest.getDestination();
> WriteRequest encodedWriteRequest = new
> EncodedWriteRequest(encodedMessage, null, destination);
> nextFilter.filterWrite(session, encodedWriteRequest);
> }
> }
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any
> explanation about such behavior.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)