[ 
https://issues.apache.org/jira/browse/CASSANDRA-20084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Konstantinov updated CASSANDRA-20084:
--------------------------------------------
    Fix Version/s: 4.1.x
           Status: Open  (was: Triage Needed)

> WaitQueue.Signal.awaitUninterruptibly() may stuck forever if the invoking 
> thread is interrupted
> -----------------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-20084
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-20084
>             Project: Apache Cassandra
>          Issue Type: Bug
>          Components: Legacy/Core, Local/Commit Log
>            Reporter: Dmitry Konstantinov
>            Assignee: Dmitry Konstantinov
>            Priority: Normal
>             Fix For: 4.1.x
>
>         Attachments: cass_threads.txt.gz, heap_dump_vars_state.png
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The issue is reported based on an analysis done in "Unexplained stuck 
> memtable flush" mail thread from Cassandra User/Dev groups.
> e2e observations: for a Cassandra node (version: 4.1.1) the memtable flush 
> randomly getting stuck and later as a consequence all writes to memtables are 
> stuck as well due to lack of memory in a memory pool.
> Based on the analysis of thread dump ( [^cass_threads.txt.gz] ) and heap dump 
> the following cause has been identified:
> We have the following stacktrace for "read-hotness-tracker:1" thread
> {code:java}
> "read-hotness-tracker:1" daemon prio=5 tid=93 WAITING
>     at jdk.internal.misc.Unsafe.park(Native Method)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:323)
>     at 
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$AbstractSignal.await(WaitQueue.java:289)
>        local variable: 
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
>     at 
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$AbstractSignal.await(WaitQueue.java:282)
>        local variable: 
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
>     at 
> org.apache.cassandra.utils.concurrent.Awaitable$Defaults.awaitUninterruptibly(Awaitable.java:186)
>        local variable: 
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
>     at 
> org.apache.cassandra.utils.concurrent.Awaitable$AbstractAwaitable.awaitUninterruptibly(Awaitable.java:259)
>     at 
> org.apache.cassandra.db.commitlog.AbstractCommitLogService.awaitSyncAt(AbstractCommitLogService.java:324)
>        local variable: 
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
>        local variable: com.codahale.metrics.Timer$Context#2086795
>     at 
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService.maybeWaitForSync(PeriodicCommitLogService.java:42)
>        local variable: 
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
>     at 
> org.apache.cassandra.db.commitlog.AbstractCommitLogService.finishWriteFor(AbstractCommitLogService.java:284)
>        local variable: 
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
>     at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:307)
>        local variable: org.apache.cassandra.db.commitlog.CommitLog#1
>        local variable: org.apache.cassandra.io.util.DataOutputBuffer$1$1#61
>        local variable: 
> org.apache.cassandra.db.commitlog.CommitLogSegment$Allocation#1
>     at 
> org.apache.cassandra.db.CassandraKeyspaceWriteHandler.addToCommitLog(CassandraKeyspaceWriteHandler.java:100)
>     at 
> org.apache.cassandra.db.CassandraKeyspaceWriteHandler.beginWrite(CassandraKeyspaceWriteHandler.java:54)
>        local variable: 
> org.apache.cassandra.utils.concurrent.OpOrder$Group#8162
>     at org.apache.cassandra.db.Keyspace.applyInternal(Keyspace.java:628)
>        local variable: org.apache.cassandra.db.Keyspace#8
>        local variable: org.apache.cassandra.db.Mutation#54491
>     at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:512)
>     at org.apache.cassandra.db.Mutation.apply(Mutation.java:228)
>     at org.apache.cassandra.db.Mutation.apply(Mutation.java:248)
>     at 
> org.apache.cassandra.cql3.statements.ModificationStatement.executeInternalWithoutCondition(ModificationStatement.java:675)
>     at 
> org.apache.cassandra.cql3.statements.ModificationStatement.executeLocally(ModificationStatement.java:666)
>     at 
> org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:447)
>     at 
> org.apache.cassandra.db.SystemKeyspace.persistSSTableReadMeter(SystemKeyspace.java:1488)
>        local variable: java.lang.String#42806
>        local variable: java.lang.String#47659
>        local variable: org.apache.cassandra.io.sstable.UUIDBasedSSTableId#1389
>        local variable: org.apache.cassandra.metrics.RestorableMeter#752
>     at 
> org.apache.cassandra.io.sstable.format.SSTableReader$GlobalTidy$1.run(SSTableReader.java:2170)
> {code}
> where we are waiting in the await() method of 
> WaitQueue$Standard$SignalWithListener#2086139 object.
> another input from a heap dump: WaitQueue$Standard$SignalWithListener#2086139 
> has the following field values based on the heap dump screenshots:
> state = -1 (it means CANCELLED: 
> org.apache.cassandra.utils.concurrent.WaitQueue.Standard#CANCELLED = -1)
> thread = null
> According to the code in WaitQueue.java such combination of values only 
> possible if we invoked cancel() on this SignalWithListener object but we 
> didn't do it in our code, it is in another branch of 
> AbstractCommitLogService.awaitSyncAt
> but let's check the await() logic more carefully:
> {code:java}
> public Signal await() throws InterruptedException
> {
>     while (!isSignalled())
>     {
>         checkInterrupted();
>         LockSupport.park(); <--- we are here
>     }
>     checkAndClear();
>     return this;
> }
> {code}
> and inspect unassuming checkInterrupted() method
> {code:java}
> private void checkInterrupted() throws InterruptedException
> {
>     if (Thread.interrupted())
>     {
>         cancel();
>         throw new InterruptedException();
>     }
> }
> {code}
> here we see that if the currently awaiting thread is interrupted then 
> SignalWithListener cancels itself.
> but we throw new InterruptedException() here, so the execution should fail 
> with it.
> Checking the thread stack trace and see that AbstractSignal.await() is 
> invoked from 
> org.apache.cassandra.utils.concurrent.Awaitable$AbstractAwaitable.awaitUninterruptibly(Awaitable.java:259)
> how it looks inside:
> {code:java}
> public static <A extends Awaitable> A awaitUninterruptibly(A await)
> {
>     boolean interrupted = false;
>     while (true)
>     {
>         try
>         {
>             await.await();
>             break;
>         }
>         catch (InterruptedException e)
>         {
>             interrupted = true;
>         }
>     }
>     if (interrupted)
>         Thread.currentThread().interrupt();
>     return await;
> }
> {code}
> Here we have a loop, if the current thread is interrupted - we have 
> InterruptedException thrown by await.await(), we catch it here and try 
> await.await() again. But in the previous invocation of await.await() we have 
> cancelled our Signal object, so the condition inside await() method:
> isSignalled(): state == SIGNALLED is always false now plus if another thread 
> will invoke notifyAll - nothing will happen, because as a part of 
> cancellation our Signal object unregister itself from the queue and the 
> notifying thread will not able to find it to notify.
> I have reproduced the case with a simple unit test.
> The last element of the puzzle - why is the thread interrupted? To understand 
> it - let's check there "read-hotness-tracker:1" thread execution started. It 
> is a background periodical runnable in 
> org.apache.cassandra.io.sstable.format.SSTableReader.GlobalTidy
> {code:java}
> readMeterSyncFuture = new 
> WeakReference<>(syncExecutor.scheduleAtFixedRate(new Runnable()
> {
>     public void run()
>     {
>         if (obsoletion == null)
>         {
>             meterSyncThrottle.acquire();
>             SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, 
> desc.id, readMeter);
>         }
>     }
> }, 1, 5, TimeUnit.MINUTES));
> {code}
> let's check how we deal with readMeterSyncFuture ref and we see our 
> interruption:
> {code:java}
> private void stopReadMeterPersistence()
> {
>     ScheduledFuture<?> readMeterSyncFutureLocal = readMeterSyncFuture.get();
>     if (readMeterSyncFutureLocal != null)
>     {
>         readMeterSyncFutureLocal.cancel(true); <========= here we can 
> interrupt the thread
>         readMeterSyncFuture = NULL;
>     }
> }
> {code}
> stopReadMeterPersistence() is invoked by one of the following branches of 
> logic when an SSTable is not in use anymore:
> {code:java}
> org.apache.cassandra.io.sstable.format.SSTableReader#markObsolete
>   org.apache.cassandra.db.lifecycle.Helpers#markObsolete
>     org.apache.cassandra.db.lifecycle.Tracker#dropSSTables(...)
>     org.apache.cassandra.db.lifecycle.LifecycleTransaction#doCommit
>     org.apache.cassandra.db.lifecycle.LifecycleTransaction#doAbort
>  {code}
> So,the issue is caused by a combination of events with a right timing:
>  - an SSTable which was become obsolete at the middle of 
> persistSSTableReadMeter background job execution for this SSTable, so we 
> interrupted the job
>  - persistSSTableReadMeter was at that moment in the middle of write 
> operation, at commit log append step
>  - and the commit log append step was stuck due to a delay in commit log sync 
> (lastSyncedAt < syncTime condition in AbstractCommitLogService), so we 
> started to wait for it using await



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to