[
https://issues.apache.org/jira/browse/CASSANDRA-14574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549610#comment-16549610
]
Jason Brown commented on CASSANDRA-14574:
-----------------------------------------
In short, I wasn't handling all error cases correctly. I was correctly handling
the case where there is a single message contained in the {{ByteBuf}} that is
fully deserialized, and then if some exception happens in the pipeline, we
close the channel and everything is fine. However, if there are multiple
messages in the buffer, or the buffer is not fully consumed when deserializing,
this is where the problems are. In the catch block of
{{MessageInHandler.decode()}}, I am calling {{exceptionHandled()}}, which
closes the channel. However, as we derive from {{ByteToMessageDecoder}}, as it
is responding to the channel close event, it will see there are unconsumed
bytes in the buffer (called {{cumulator}} in the class), and (re-)invoke
{{decode()}}. Unfortunately, if you are in a bad state and partway through the
stream, you will fail to correctly deserialize any messages and it's downhill
from there (you start looping over the same failure pattern: exception, call
close channel, {{ByteToMessageDecoder}} calls {{decode()}}, repeat ...). The
most safe thing to do here is pass the caught exception to
{{ByteToMessageDecoder}}, and prevent any future processing in the {{decode()}}
method.
The patch here resolves the error handling in the inbound pipeline (see below
for details on the failing dtest):
||14574||
|[branch|https://github.com/jasobrown/cassandra/tree/14574]|
|[utests &
dtests|https://circleci.com/gh/jasobrown/workflows/cassandra/tree/14574]|
This patch does several things in the {{MessageInHandler.decode()}} method's
exception block (which is where the problems lie):
- explicitly throws the exception from the handler to the parent
{{ByteToMessageDecoder}}, where it can properly break out of the while loop in
{{callDecode()}}, and more properly send the exception to the
{{exceptionHandled()}} method (which is overridden in {{BaseMessageInHandler}})
where we close the channel.
- moves the {{ByteBuf}} 's readIndex to the end of the buffer, to make it
appear as though the buffer has been fully 'consumed'. This optimizes (and
helps with correctness of) {{ByteToMessageDecoder}}, because when the channel
is closed, {{ByteToMessageDecoder.channelInputClosed()}} attempts, several
times, to ensure all the bytes from the backing {{ByteBuf}} ({{cumulator}}) are
consumed. Even though the state of the implementing handler is borked, the
parent {{ByteToMessageDecoder}} will still keep trying to make sure all the
bytes in {{cumulator}} are consumed before closing the channel. Thus, forcing
the readIndex to the end of the buffer avoids that situation.
- adds an explicit {{CLOSED}} state to the {{MessageInHandler}}, and the
handler's state is set to {{CLOSED}} when a message fails to be deserialized or
other error, for example: when the table doesn't exist (see below). While this
is probably not completely necessary for correctness due to the other changes
(primarily the one about moving the readIndex to the end of the buffer), it
makes the state of the handler much more explicit, depends less on knowledge of
the internal details of netty, and more resilient to implementation changes in
the netty library itself.
After this fix, the
{{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}}
dtest still fails, however, not with the same exception stack trace as
reported. Instead, this one:
{noformat}
io.netty.handler.codec.DecoderException:
org.apache.cassandra.exceptions.UnknownTableException: Couldn't find table with
id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just created, this is
likely due to the schemanot being fully propagated. Please wait for schema
agreement on table creation.
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1342)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:934)
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:979)
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:307)
at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.cassandra.exceptions.UnknownTableException: Couldn't find
table with id 3694c0c0-8b6b-11e8-841f-cd3e85e9c250. If a table was just
created, this is likely due to the schemanot being fully propagated. Please
wait for schema agreement on table creation.
at
org.apache.cassandra.schema.Schema.getExistingTableMetadata(Schema.java:438)
at
org.apache.cassandra.db.partitions.PartitionUpdate$PartitionUpdateSerializer.deserialize(PartitionUpdate.java:612)
at
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:353)
at
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371)
at
org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335)
at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158)
at
org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:130)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
{noformat}
Thus the problem here appears to be request not finding the correct table in
the schema. In my test local runs with the above patch applied, that table is
id correct and eventually exists (for the MView), but not when the message
comes in.
The reason why I had not seen this dtest failure in the past (and dtest runs
were green), is because it was only exposed by the recent commit for
CASSANDRA-13426. I bisected back to a few commits before CASSANDRA-14485
(started on sha {{2bad5d5b6d2134ecd3db63d02aa2274299d1d748}}), and it
identified CASSANDRA-13426 as the commit that caused
{{materialized_views_test.py::TestMaterializedViews::test_populate_mv_after_insert_wide_rows}}
to start failing. My fix corrects the nastier part of that failure, but
there's another issue that is outside the scope of the internode messaging.
> Racy incorrect handling of incoming messages
> ---------------------------------------------
>
> Key: CASSANDRA-14574
> URL: https://issues.apache.org/jira/browse/CASSANDRA-14574
> Project: Cassandra
> Issue Type: Bug
> Components: Streaming and Messaging
> Reporter: Aleksey Yeschenko
> Assignee: Jason Brown
> Priority: Major
> Fix For: 4.0
>
>
> {{MessageInHandler.decode()}} occasionally reads the payload incorrectly,
> passing the full message to {{MessageIn.read()}} instead of just the payload
> bytes.
> You can see the stack trace in the logs from this [CI
> run|https://circleci.com/gh/iamaleksey/cassandra/437#tests/containers/38].
> {code}
> Caused by: java.lang.AssertionError: null
> at
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:351)
> at
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:371)
> at
> org.apache.cassandra.db.Mutation$MutationSerializer.deserialize(Mutation.java:335)
> at org.apache.cassandra.net.MessageIn.read(MessageIn.java:158)
> at
> org.apache.cassandra.net.async.MessageInHandler.decode(MessageInHandler.java:132)
> {code}
> Reconstructed, truncated stream passed to {{MessageIn.read()}}:
> {{0000000b000743414c5f42414301002a01e1a5c9b089fd11e8b517436ee1243007040000005d10fc50ec}}
> You can clearly see parameters in there encoded before the payload:
> {{[43414c5f424143 - CAL_BAC] [01 - ONE_BYTE] [002a - 42, payload size] 01 e1
> a5 c9 b0 89 fd 11 e8 b5 17 43 6e e1 24 30 07 04 00 00 00 1d 10 fc 50 ec}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]