[
https://issues.apache.org/jira/browse/CASSANDRA-20084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dmitry Konstantinov updated CASSANDRA-20084:
--------------------------------------------
Status: Needs Committer (was: Patch Available)
> WaitQueue.Signal.awaitUninterruptibly() may stuck forever if the invoking
> thread is interrupted
> -----------------------------------------------------------------------------------------------
>
> Key: CASSANDRA-20084
> URL: https://issues.apache.org/jira/browse/CASSANDRA-20084
> Project: Apache Cassandra
> Issue Type: Bug
> Components: Legacy/Core, Local/Commit Log
> Reporter: Dmitry Konstantinov
> Assignee: Dmitry Konstantinov
> Priority: Normal
> Fix For: 4.1.x
>
> Attachments: cass_threads.txt.gz, heap_dump_vars_state.png
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> The issue is reported based on an analysis done in "Unexplained stuck
> memtable flush" mail thread from Cassandra User/Dev groups.
> e2e observations: for a Cassandra node (version: 4.1.1) the memtable flush
> randomly getting stuck and later as a consequence all writes to memtables are
> stuck as well due to lack of memory in a memory pool.
> Based on the analysis of thread dump ( [^cass_threads.txt.gz] ) and heap dump
> the following cause has been identified:
> We have the following stacktrace for "read-hotness-tracker:1" thread
> {code:java}
> "read-hotness-tracker:1" daemon prio=5 tid=93 WAITING
> at jdk.internal.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:323)
> at
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$AbstractSignal.await(WaitQueue.java:289)
> local variable:
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
> at
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$AbstractSignal.await(WaitQueue.java:282)
> local variable:
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
> at
> org.apache.cassandra.utils.concurrent.Awaitable$Defaults.awaitUninterruptibly(Awaitable.java:186)
> local variable:
> org.apache.cassandra.utils.concurrent.WaitQueue$Standard$SignalWithListener#2086139
> at
> org.apache.cassandra.utils.concurrent.Awaitable$AbstractAwaitable.awaitUninterruptibly(Awaitable.java:259)
> at
> org.apache.cassandra.db.commitlog.AbstractCommitLogService.awaitSyncAt(AbstractCommitLogService.java:324)
> local variable:
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
> local variable: com.codahale.metrics.Timer$Context#2086795
> at
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService.maybeWaitForSync(PeriodicCommitLogService.java:42)
> local variable:
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
> at
> org.apache.cassandra.db.commitlog.AbstractCommitLogService.finishWriteFor(AbstractCommitLogService.java:284)
> local variable:
> org.apache.cassandra.db.commitlog.PeriodicCommitLogService#1
> at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:307)
> local variable: org.apache.cassandra.db.commitlog.CommitLog#1
> local variable: org.apache.cassandra.io.util.DataOutputBuffer$1$1#61
> local variable:
> org.apache.cassandra.db.commitlog.CommitLogSegment$Allocation#1
> at
> org.apache.cassandra.db.CassandraKeyspaceWriteHandler.addToCommitLog(CassandraKeyspaceWriteHandler.java:100)
> at
> org.apache.cassandra.db.CassandraKeyspaceWriteHandler.beginWrite(CassandraKeyspaceWriteHandler.java:54)
> local variable:
> org.apache.cassandra.utils.concurrent.OpOrder$Group#8162
> at org.apache.cassandra.db.Keyspace.applyInternal(Keyspace.java:628)
> local variable: org.apache.cassandra.db.Keyspace#8
> local variable: org.apache.cassandra.db.Mutation#54491
> at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:512)
> at org.apache.cassandra.db.Mutation.apply(Mutation.java:228)
> at org.apache.cassandra.db.Mutation.apply(Mutation.java:248)
> at
> org.apache.cassandra.cql3.statements.ModificationStatement.executeInternalWithoutCondition(ModificationStatement.java:675)
> at
> org.apache.cassandra.cql3.statements.ModificationStatement.executeLocally(ModificationStatement.java:666)
> at
> org.apache.cassandra.cql3.QueryProcessor.executeInternal(QueryProcessor.java:447)
> at
> org.apache.cassandra.db.SystemKeyspace.persistSSTableReadMeter(SystemKeyspace.java:1488)
> local variable: java.lang.String#42806
> local variable: java.lang.String#47659
> local variable: org.apache.cassandra.io.sstable.UUIDBasedSSTableId#1389
> local variable: org.apache.cassandra.metrics.RestorableMeter#752
> at
> org.apache.cassandra.io.sstable.format.SSTableReader$GlobalTidy$1.run(SSTableReader.java:2170)
> {code}
> where we are waiting in the await() method of
> WaitQueue$Standard$SignalWithListener#2086139 object.
> another input from a heap dump: WaitQueue$Standard$SignalWithListener#2086139
> has the following field values based on the heap dump screenshots:
> state = -1 (it means CANCELLED:
> org.apache.cassandra.utils.concurrent.WaitQueue.Standard#CANCELLED = -1)
> thread = null
> According to the code in WaitQueue.java such combination of values only
> possible if we invoked cancel() on this SignalWithListener object but we
> didn't do it in our code, it is in another branch of
> AbstractCommitLogService.awaitSyncAt
> but let's check the await() logic more carefully:
> {code:java}
> public Signal await() throws InterruptedException
> {
> while (!isSignalled())
> {
> checkInterrupted();
> LockSupport.park(); <--- we are here
> }
> checkAndClear();
> return this;
> }
> {code}
> and inspect unassuming checkInterrupted() method
> {code:java}
> private void checkInterrupted() throws InterruptedException
> {
> if (Thread.interrupted())
> {
> cancel();
> throw new InterruptedException();
> }
> }
> {code}
> here we see that if the currently awaiting thread is interrupted then
> SignalWithListener cancels itself.
> but we throw new InterruptedException() here, so the execution should fail
> with it.
> Checking the thread stack trace and see that AbstractSignal.await() is
> invoked from
> org.apache.cassandra.utils.concurrent.Awaitable$AbstractAwaitable.awaitUninterruptibly(Awaitable.java:259)
> how it looks inside:
> {code:java}
> public static <A extends Awaitable> A awaitUninterruptibly(A await)
> {
> boolean interrupted = false;
> while (true)
> {
> try
> {
> await.await();
> break;
> }
> catch (InterruptedException e)
> {
> interrupted = true;
> }
> }
> if (interrupted)
> Thread.currentThread().interrupt();
> return await;
> }
> {code}
> Here we have a loop, if the current thread is interrupted - we have
> InterruptedException thrown by await.await(), we catch it here and try
> await.await() again. But in the previous invocation of await.await() we have
> cancelled our Signal object, so the condition inside await() method:
> isSignalled(): state == SIGNALLED is always false now plus if another thread
> will invoke notifyAll - nothing will happen, because as a part of
> cancellation our Signal object unregister itself from the queue and the
> notifying thread will not able to find it to notify.
> I have reproduced the case with a simple unit test.
> The last element of the puzzle - why is the thread interrupted? To understand
> it - let's check there "read-hotness-tracker:1" thread execution started. It
> is a background periodical runnable in
> org.apache.cassandra.io.sstable.format.SSTableReader.GlobalTidy
> {code:java}
> readMeterSyncFuture = new
> WeakReference<>(syncExecutor.scheduleAtFixedRate(new Runnable()
> {
> public void run()
> {
> if (obsoletion == null)
> {
> meterSyncThrottle.acquire();
> SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname,
> desc.id, readMeter);
> }
> }
> }, 1, 5, TimeUnit.MINUTES));
> {code}
> let's check how we deal with readMeterSyncFuture ref and we see our
> interruption:
> {code:java}
> private void stopReadMeterPersistence()
> {
> ScheduledFuture<?> readMeterSyncFutureLocal = readMeterSyncFuture.get();
> if (readMeterSyncFutureLocal != null)
> {
> readMeterSyncFutureLocal.cancel(true); <========= here we can
> interrupt the thread
> readMeterSyncFuture = NULL;
> }
> }
> {code}
> stopReadMeterPersistence() is invoked by one of the following branches of
> logic when an SSTable is not in use anymore:
> {code:java}
> org.apache.cassandra.io.sstable.format.SSTableReader#markObsolete
> org.apache.cassandra.db.lifecycle.Helpers#markObsolete
> org.apache.cassandra.db.lifecycle.Tracker#dropSSTables(...)
> org.apache.cassandra.db.lifecycle.LifecycleTransaction#doCommit
> org.apache.cassandra.db.lifecycle.LifecycleTransaction#doAbort
> {code}
> So,the issue is caused by a combination of events with a right timing:
> - an SSTable which was become obsolete at the middle of
> persistSSTableReadMeter background job execution for this SSTable, so we
> interrupted the job
> - persistSSTableReadMeter was at that moment in the middle of write
> operation, at commit log append step
> - and the commit log append step was stuck due to a delay in commit log sync
> (lastSyncedAt < syncTime condition in AbstractCommitLogService), so we
> started to wait for it using await
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]