[ 
https://issues.apache.org/jira/browse/CASSANDRA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yifan Cai updated CASSANDRA-16143:
----------------------------------
    Test and Documentation Plan: ci, unit test, jvm dtest, adhoc test
                         Status: Patch Available  (was: Open)

PR: https://github.com/apache/cassandra/pull/824
CI: 
https://app.circleci.com/pipelines/github/yifan-c/cassandra/164/workflows/a1fee16c-b4cd-4e67-b52f-f2e649ea33da

The patch 
# adds a dedicated TCP user timeout (defaults to 0) for streaming.
# collects the processing time for the received incoming stream messages.
# logs warning when the processing time exceeds the tcp user timeout.

I chose to not add "a separate (unbounded) excecutor service" for 
FileChannel.force and "periodically call force". 
In the code base, the incoming stream messages are consumed in a dedicated 
thread that polls from {{AsyncStreamingInputPlus}}. It does not block the netty 
event loop. The {{AsyncStreamingInputPlus}} intentionally disables auto read 
from netty channel to avoid OOM. If we starts an executor service to consume 
the bytes and FileChannel.force clogs, we end up with keeping more deserialized 
messages in memory with the risk of OOM. The timeout is actually related to the 
auto read being disabled. The receiver is blocked at flushing and not able to 
issue newer channel read. 
I am also concerned about calling force periodically. According to the java doc 
of FileChannel.force (quoted below), do so may increase the I/O ops that are 
not necessary. 

{quote}     <p> Invoking this method may cause an I/O operation to occur even 
if the
     channel was only opened for reading.  Some operating systems, for
     example, maintain a last-access time as part of a file's metadata, and
     this time is updated whenever the file is read.  Whether or not this is
     actually done is system-dependent and is therefore unspecified.{quote}

> Streaming fails when s SSTable writer finish() exceeds 
> internode_tcp_user_timeout
> ---------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-16143
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-16143
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Messaging/Internode
>            Reporter: Jon Meredith
>            Assignee: Yifan Cai
>            Priority: Normal
>             Fix For: 4.0-beta
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> tl;dr The internode TCP user timeout that provides more responsive detection 
> of dead nodes for internode message will cause streaming to fail if system 
> calls to fsync/fdatasync exceed the timeout (default 30s).
> To workaround, explicitly set internode_tcp_user_timeout to longer than 
> fsync/fdatasync, or to zero to revert to the operating system default.
> Details:
> While bootstrapping a replacement 4.0beta3 node in an existing cluster, 
> bootstrap streaming repeatedly failed with the streaming follower logging
> {code:java}
> ERROR 2020-09-10T14:29:34,711 [NettyStreaming-Outbound-1.1.1.1.7000:1] 
> org.apache.cassandra.streaming.StreamSession:693 - [Stream 
> #7cb67c00-f3ac-11ea-b940-f7836f164528] Streaming error occurred on session 
> with peer 1.1.1.1:7000
> org.apache.cassandra.net.AsyncChannelOutputPlus$FlushException: The channel 
> this output stream was writing to has been closed
>        at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.propagateFailedFlush(AsyncChannelOutputPlus.java:200)
>        at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.waitUntilFlushed(AsyncChannelOutputPlus.java:158)
>        at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.waitForSpace(AsyncChannelOutputPlus.java:140)
>        at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.beginFlush(AsyncChannelOutputPlus.java:97)
>        at 
> org.apache.cassandra.net.AsyncStreamingOutputPlus.lambda$writeToChannel$0(AsyncStreamingOutputPlus.java:142)
>        at 
> org.apache.cassandra.db.streaming.CassandraCompressedStreamWriter.lambda$write$0(CassandraCompressedStreamWriter.java:90)
>        at 
> org.apache.cassandra.net.AsyncStreamingOutputPlus.writeToChannel(AsyncStreamingOutputPlus.java:138)
>        at 
> org.apache.cassandra.db.streaming.CassandraCompressedStreamWriter.write(CassandraCompressedStreamWriter.java:89)
>        at 
> org.apache.cassandra.db.streaming.CassandraOutgoingFile.write(CassandraOutgoingFile.java:180)
>        at 
> org.apache.cassandra.streaming.messages.OutgoingStreamMessage.serialize(OutgoingStreamMessage.java:87)
>        at 
> org.apache.cassandra.streaming.messages.OutgoingStreamMessage$1.serialize(OutgoingStreamMessage.java:45)
>        at 
> org.apache.cassandra.streaming.messages.OutgoingStreamMessage$1.serialize(OutgoingStreamMessage.java:34)
>        at 
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:40)
>        at 
> org.apache.cassandra.streaming.async.NettyStreamingMessageSender$FileStreamTask.run(NettyStreamingMessageSender.java:347)
>        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>        at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  [netty-all-4.1.50.Final.jar:4.1.50.Final]
>        at java.lang.Thread.run(Thread.java:834) [?:?]
>        Suppressed: java.nio.channels.ClosedChannelException
>                at 
> org.apache.cassandra.net.AsyncStreamingOutputPlus.doFlush(AsyncStreamingOutputPlus.java:78)
>                at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.flush(AsyncChannelOutputPlus.java:229)
>                at 
> org.apache.cassandra.net.AsyncChannelOutputPlus.close(AsyncChannelOutputPlus.java:248)
>                at 
> org.apache.cassandra.streaming.async.NettyStreamingMessageSender$FileStreamTask.run(NettyStreamingMessageSender.java:348)
>                at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>                at java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> [?:?]
>                at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>                at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>                at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  [netty-all-4.1.50.Final.jar:4.1.50.Final]
>                at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) 
> failed: Connection timed out
> {code}
> and the boostrapping node (the streaming initiator) logging (times are from 
> two separate attempts, pattern was very similar each time, IP addresses 
> doctored to protect the innocent)
> {code:java}
> ERROR 2020-09-11T09:45:54,720 [Stream-Deserializer-2.2.2.2:7000-93837c82] 
> org.apache.cassandra.streaming.StreamSession:917 - [Stream 
> #f921ea30-f44d-11ea-b7f1-356e3edd6247] Remote peer 2.2.2.2:7000 failed stream 
> session.
> DEBUG 2020-09-11T09:45:54,721 [Stream-Deserializer-2.2.2.2:7000-93837c82] 
> org.apache.cassandra.streaming.StreamSession:510 - [Stream 
> #f921ea30-f44d-11ea-b7f1-356e3edd6247] Will close attached channels 
> {93837c82=[id: 0x93837c82, L:/1.1.1.1:56859 ! R:2.2.2.2/2.2.2.2:7000], 
> a608109b=[id: 0xa608109b, L:/1.1.1.1:7000 - R:/2.2.2.2:49426]}
> INFO  2020-09-11T09:45:54,722 [Stream-Deserializer-2.2.2.2:7000-93837c82] 
> org.apache.cassandra.streaming.StreamResultFuture:192 - [Stream 
> #f921ea30-f44d-11ea-b7f1-356e3edd6247] Session with 2.2.2.2:7000 is complete
> DEBUG 2020-09-11T09:45:54,723 [Messaging-EventLoop-3-15] 
> org.apache.cassandra.streaming.async.NettyStreamingMessageSender:553 - 
> [Stream #f921ea30-f44d-11ea-b7f1-356e3edd6247] Closing stream connection 
> channels on 2.2.2.2:7000
> ERROR 2020-09-11T09:45:55,121 [Stream-Deserializer-2.2.2.2:7000-a608109b] 
> org.apache.cassandra.streaming.StreamSession:693 - [Stream 
> #f921ea30-f44d-11ea-b7f1-356e3edd6247] Streaming error occurred on session 
> with peer 2.2.2.2:7000
> org.apache.cassandra.streaming.StreamReceiveException: java.io.EOFException
>        at 
> org.apache.cassandra.streaming.messages.IncomingStreamMessage$1.deserialize(IncomingStreamMessage.java:60)
>        at 
> org.apache.cassandra.streaming.messages.IncomingStreamMessage$1.deserialize(IncomingStreamMessage.java:38)
>        at 
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:51)
>        at 
> org.apache.cassandra.streaming.async.StreamingInboundHandler$StreamDeserializingTask.run(StreamingInboundHandler.java:172)
>        at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  [netty-all-4.1.50.Final.jar:4.1.50.Final]
>        at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: java.io.EOFException
>        at 
> org.apache.cassandra.net.AsyncStreamingInputPlus.reBuffer(AsyncStreamingInputPlus.java:133)
>        at 
> org.apache.cassandra.io.util.RebufferingInputStream.readByte(RebufferingInputStream.java:178)
>        at 
> org.apache.cassandra.io.util.RebufferingInputStream.readPrimitiveSlowly(RebufferingInputStream.java:142)
>        at 
> org.apache.cassandra.io.util.RebufferingInputStream.readLong(RebufferingInputStream.java:231)
>        at 
> org.apache.cassandra.io.compress.CompressionMetadata$ChunkSerializer.deserialize(CompressionMetadata.java:538)
>        at 
> org.apache.cassandra.io.compress.CompressionMetadata$ChunkSerializer.deserialize(CompressionMetadata.java:528)
>        at 
> org.apache.cassandra.db.streaming.CompressionInfo$CompressionInfoSerializer.deserialize(CompressionInfo.java:88)
>        at 
> org.apache.cassandra.db.streaming.CompressionInfo$CompressionInfoSerializer.deserialize(CompressionInfo.java:61)
>        at 
> org.apache.cassandra.db.streaming.CassandraStreamHeader$CassandraStreamHeaderSerializer.deserialize(CassandraStreamHeader.java:230)
>        at 
> org.apache.cassandra.db.streaming.CassandraStreamHeader$CassandraStreamHeaderSerializer.deserialize(CassandraStreamHeader.java:210)
>        at 
> org.apache.cassandra.db.streaming.CassandraStreamHeader$CassandraStreamHeaderSerializer.deserialize(CassandraStreamHeader.java:178)
>        at 
> org.apache.cassandra.db.streaming.CassandraIncomingFile.read(CassandraIncomingFile.java:69)
>        at 
> org.apache.cassandra.streaming.messages.IncomingStreamMessage$1.deserialize(IncomingStreamMessage.java:53)
>        ... 5 more
> DEBUG 2020-09-11T09:45:55,123 [Stream-Deserializer-2.2.2.2:7000-93837c82] 
> org.apache.cassandra.streaming.StreamSession:657 - [Stream 
> #f921ea30-f44d-11ea-b7f1-356e3edd6247] Socket closed after session completed 
> with state FAILED
> {code}
> The issue was repeatable across many attempts to bootstrap. Cluster was 
> deployed on Linux hosts running a 4.1.x kernel.
> Investigating the cause of the NativeIoException error message showed that a 
> write(2) call return must have been ETIMEDOUT, which occurs when a TCP user 
> timeout is set on the socket.  In 4.0 connections now do that when EPOLL is 
> available, with a default of 30s to detect failures faster.
> Investigating the socket buffers around the time of crash showed the send 
> buffer filling on the streaming follower and the receive buffer filling on 
> the streaming initiator.
> Capturing a heap dump and analyzing with MAT showed a thread in this state 
> until the stream failed.
> {code:java}
> Object / Stack Frame |Name |
>  
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>  io.netty.util.concurrent.FastThreadLocalThread @ 0x6916ea170 
> |Stream-Deserializer-2.2.2.2:7000-a608109b|
> | - at sun.nio.ch.FileDispatcherImpl.force0(Ljava/io/FileDescriptor;Z)I 
> (Native Method)| |
> | - at sun.nio.ch.FileDispatcherImpl.force(Ljava/io/FileDescriptor;Z)I 
> (FileDispatcherImpl.java:82)| |
> | - at sun.nio.ch.FileChannelImpl.force(Z)V (FileChannelImpl.java:461)| |
> | - at 
> org.apache.cassandra.utils.SyncUtil.force(Ljava/nio/channels/FileChannel;Z)V 
> (SyncUtil.java:172)| |
> | - at org.apache.cassandra.io.util.SequentialWriter.syncDataOnlyInternal()V 
> (SequentialWriter.java:190)| |
> | - at org.apache.cassandra.io.util.SequentialWriter.syncInternal()V 
> (SequentialWriter.java:206)| |
> | - at 
> org.apache.cassandra.io.compress.CompressedSequentialWriter.access$100(Lorg/apache/cassandra/io/compress/CompressedSequentialWriter;)V
>  (CompressedSequentialWriter.java:39)| |
> | - at 
> org.apache.cassandra.io.compress.CompressedSequentialWriter$TransactionalProxy.doPrepare()V
>  (CompressedSequentialWriter.java:353)| |
> | - at 
> org.apache.cassandra.utils.concurrent.Transactional$AbstractTransactional.prepareToCommit()V
>  (Transactional.java:168)| |
> | - at org.apache.cassandra.io.util.SequentialWriter.prepareToCommit()V 
> (SequentialWriter.java:379)| |
> | - at 
> org.apache.cassandra.io.sstable.format.big.BigTableWriter$TransactionalProxy.doPrepare()V
>  (BigTableWriter.java:409)| |
> | - at 
> org.apache.cassandra.utils.concurrent.Transactional$AbstractTransactional.prepareToCommit()V
>  (Transactional.java:168)| |
> | - at 
> org.apache.cassandra.utils.concurrent.Transactional$AbstractTransactional.finish()Ljava/lang/Object;
>  (Transactional.java:179)| |
> | - at 
> org.apache.cassandra.io.sstable.format.SSTableWriter.finish(Z)Lorg/apache/cassandra/io/sstable/format/SSTableReader;
>  (SSTableWriter.java:266)| |
> | - at 
> org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter.finish(Z)Ljava/util/Collection;
>  (SimpleSSTableMultiWriter.java:59)| |
> | - at 
> org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter.finish(Z)Ljava/util/Collection;
>  (RangeAwareSSTableWriter.java:135)| |
> | - at 
> org.apache.cassandra.db.streaming.CassandraStreamReceiver.received(Lorg/apache/cassandra/streaming/IncomingStream;)V
>  (CassandraStreamReceiver.java:108)| |
> | - at 
> org.apache.cassandra.streaming.StreamReceiveTask.received(Lorg/apache/cassandra/streaming/IncomingStream;)V
>  (StreamReceiveTask.java:91)| |
> | - at 
> org.apache.cassandra.streaming.StreamSession.receive(Lorg/apache/cassandra/streaming/messages/IncomingStreamMessage;)V
>  (StreamSession.java:848)| |
> | - at 
> org.apache.cassandra.streaming.StreamSession.messageReceived(Lorg/apache/cassandra/streaming/messages/StreamMessage;)V
>  (StreamSession.java:597)| |
> | - at 
> org.apache.cassandra.streaming.async.StreamingInboundHandler$StreamDeserializingTask.run()V
>  (StreamingInboundHandler.java:189)| |
> | - at io.netty.util.concurrent.FastThreadLocalRunnable.run()V 
> (FastThreadLocalRunnable.java:30)| |
> | - at java.lang.Thread.run()V (Thread.java:834)| |
> '- Total: 23 entries | |
>  
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>  {code}
> Looking at the Java sources, the {{force0}} function calls {{fdatasync}} 
> under Linux. Using {{perf trace record}} showed that some fdatasyncs were 
> exceeding the socket user timeout threshold, explaining the failures.
> This issue required significant effort to determine the root cause, so at 
> least it would be good to improve the ability to diagnose this for an 
> operator that may not have access to system call profiling.
> Obviously {{fdatasync}} going away for 30s+ is a system tuning issue but I do 
> not know of a way to guarantee an upper bound on fdatasync time, just make it 
> more probable it completes.
> One workaround is to increase the TCP user timeout from 30s to whatever the 
> host needs to complete, or to reset to the system default with {{0}}, however 
> this impacts regular internode messaging as well as streaming, and it's 
> desirable to discover dead nodes as early as possible.
> Possible improvements:
> * Separate the user timeout thresholds for internode messaging and streaming.
> * Create a metric for FileChannel.force times by unique filesystem to surface 
> that as a possible timeout.
> * Introduce a log warning when FileChannel.force times exceed a threshold.
> * Run FileChannel.force on a separate (unbounded) excecutor service and 
> either continue to stream while force completes in the background with at 
> least some backpressure on the stream, or throw on the initiator to improve 
> diagnosis.
> * Periodically call force by time and/or volume written on a background 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to