[ 
https://issues.apache.org/jira/browse/CASSANDRA-21181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ariel Weisberg updated CASSANDRA-21181:
---------------------------------------
    Description: 
MultiNodeTableWalkWithWitnessesTest fails with read timeouts when 
reconciliation hangs.

ShortPaxosTrackingSimulationTest fails with:
{code:java}
  Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2, 1, 
0}                                                                              
                                                                                
                                                                         
      at 
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
                                                                                
                                                                                
                                              
      at java.base/java.lang.Iterable.forEach(Iterable.java:75)                 
                                                                                
                                                                                
                                                                         
      at 
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
                                                                                
                                                                                
                                              
      at 
org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
                                                                                
                                                                                
              
      at org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124) 
                                                                                
                                                                                
                                                                         
      at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)    
                                                                                
                                                                                
                                                                         
      at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)     
                                                                                
                                                                                
                                                                         
      at 
org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
       
{code}
The read reconciliation protocol requires each node to verify it has all 
mutations that other nodes have. When a node discovers it's missing a mutation, 
it registers a callback ("notify me when this mutation arrives") and sends a 
pull request to fetch it. The callback decrements a counter, and the read can 
only 
complete when all counters reach zero.

A race condition in the duplicate mutation handling causes the callback to 
never fire, leaving the counter permanently stuck at 1 and the read hanging 
until timeout.
h2. The race

Setup: Node2 is a summary node for a tracked read. Node3 is the data node.

Node2 creates its mutation summary for the read. At this point, coordinator log

{2, 20} has offsets [0,0] — node2 knows about offset 0 only.

A write happens on node2 (normal replication, unrelated to the read). This 
write gets offset 1 in coordinator log \{2, 20}

. The write goes through {{{}Keyspace.applyInternalTracked(){}}}: 
{{startWriting()}} returns true (first time seeing this offset), the mutation 
is applied to memtables, {{finishWriting()}} calls 
{{invokeListeners(mutationId)}} — but nobody is listening yet, so it's a no-op.

Node3 sends its summary to node2. Node3's summary has [0,1] for log

{2, 20} — it knows about both offsets 0 and 1.

Node2 processes node3's summary in {{{}acceptRemoteSummary(){}}}. 
{{collectLocallyMissingMutations()}} compares node3's summary against node2's 
current local state. Even though the mutation arrived in step 2, there is a 
window where the collection logic still identifies offset 1 as missing — 
{{collectLocallyMissingMutations()}} takes a read lock on the CoordinatorLog 
and computes the difference against {{{}witnessedOffsets{}}}, but this check 
can race with the write in step 2. It finds offset 1 is missing, calls 
{{registerMutationCallback()}} which succeeds (first listener — the earlier 
{{invokeListeners}} found no listeners and was a no-op), increments the 
remaining mutations counter by 1, and calls {{pull()}} which sends 
{{PULL_MUTATIONS_REQ}} to node2 itself (since log \{2, 20}

is node2's own coordinator log).

Node2 receives its own pull request. {{PullMutationsRequest}} handler calls 
{{{}requestMissingMutations(){}}}, which schedules the mutation for delivery 
via the {{{}ActiveLogReconciler{}}}.

The {{ActiveLogReconciler}} looks up the mutation in the journal (it's there), 
reads it, and sends a {{PUSH_MUTATION_REQ}} back to node2.

Node2 receives the pushed mutation. {{PushMutationRequest}} handler calls 
{{{}mutation.applyFuture(){}}}, which goes through 
{{{}Keyspace.applyInternalTracked(){}}}. 
{{MutationTrackingService.startWriting()}} is called, but 
{{CoordinatorLog.startWriting()}} checks {{witnessedOffsets}} and finds offset 
1 is already 
witnessed from step 2 — returns false. Because {{started}} is false, 
{{finishWriting()}} is never called, {{invokeListeners()}} is never called, the 
callback registered in step 4 is never invoked, and the remaining mutations 
counter stays at 1 forever.

Node2 never completes reconciliation, so it never sends {{READ_RECONCILE_ACK}} 
to node3. Node3 is waiting for node2's syncAck. It never arrives. After 60 
seconds, the client times out with {{{}OperationTimedOutException{}}}.
h2. The fix

In {{{}MutationTrackingService.startWriting(){}}}, when the underlying 
{{CoordinatorLog.startWriting()}} returns false (duplicate mutation already 
witnessed), we now call 
{{{}incomingMutations.invokeListeners(mutation.id()){}}}. This fires any 
pending callbacks even for duplicate mutations. The callback fires, the counter
decrements to 0, the summary node completes reconciliation and sends its 
syncAck, and the read succeeds.

This is safe because {{startWriting()}} returning false means 
{{finishWriting()}} already completed for this offset (it's the only code path 
that adds to {{{}witnessedOffsets{}}}), so the mutation data is fully written 
and visible.
{code:java}
  // MutationTrackingService.startWriting()
  boolean started = 
getOrCreateShards(mutation.getKeyspaceName()).startWriting(mutation);
  // If this is a duplicate mutation (already witnessed), notify any pending 
read
  // reconciliation listeners. A listener can be registered between the first 
write's
  // invokeListeners() call (which found no listeners) and this duplicate's 
arrival,
  // causing the listener to never fire and the read to hang.
  if (!started)
      incomingMutations.invokeListeners(mutation.id());
  return started;
  {code}
h2. Alternatives

"check-after-register" in {{ReadReconciliations.pull()}} that would avoid the 
unnecessary pull round-trip entirely: after registering the callback, 
immediately check if the mutation is already in {{{}witnessedOffsets{}}}, and 
if so fire the callback directly without sending a pull request.
This follows the standard "register interest, then check if you missed the 
event" pattern. However, this would require threading a new 
{{isMutationWitnessed()}} method through {{{}MutationTrackingService{}}}, 
{{{}Shard{}}}, and {{{}CoordinatorLog{}}}, adding complexity across multiple 
layers. The {{startWriting()}} fix is
simpler — a single check at the point where duplicates are already detected — 
and is consistent with how {{finishWriting()}} and the transfer activation path 
already call {{invokeListeners()}} under the same lock.

It might also work to leverage the locking to make the processing of 
{{collectLocallyMissingMutations()}} atomic with registration and notification 
of listeners, but that wasn't as straightforward a fix.

  was:
MultiNodeTableWalkWithWitnessesTest fails with read timeouts when 
reconciliation hangs.

ShortPaxosTrackingSimulationTest fails with:

{code:java}
  Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2, 1, 
0}                                                                              
                                                                                
                                                                         
      at 
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
                                                                                
                                                                                
                                              
      at java.base/java.lang.Iterable.forEach(Iterable.java:75)                 
                                                                                
                                                                                
                                                                         
      at 
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
                                                                                
                                                                                
                                              
      at 
org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
                                                                                
                                                                                
              
      at org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124) 
                                                                                
                                                                                
                                                                         
      at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)    
                                                                                
                                                                                
                                                                         
      at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)     
                                                                                
                                                                                
                                                                         
      at 
org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
       
{code}
                                                                                
                                                                         

                                                                                
                                                                 


> Fix ShortPaxosTrackingSimulationTest and MultiNodeTableWalkWithWitnessesTest
> ----------------------------------------------------------------------------
>
>                 Key: CASSANDRA-21181
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-21181
>             Project: Apache Cassandra
>          Issue Type: Sub-task
>          Components: Consistency/Coordination
>            Reporter: Ariel Weisberg
>            Assignee: Ariel Weisberg
>            Priority: Normal
>
> MultiNodeTableWalkWithWitnessesTest fails with read timeouts when 
> reconciliation hangs.
> ShortPaxosTrackingSimulationTest fails with:
> {code:java}
>   Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2, 
> 1, 0}                                                                         
>                                                                               
>                                                                               
>   
>       at 
> org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
>                                                                               
>                                                                               
>                                                   
>       at java.base/java.lang.Iterable.forEach(Iterable.java:75)               
>                                                                               
>                                                                               
>                                                                               
>  
>       at 
> org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
>                                                                               
>                                                                               
>                                                   
>       at 
> org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
>                                                                               
>                                                                               
>                   
>       at 
> org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124)        
>                                                                               
>                                                                               
>                                                                       
>       at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)  
>                                                                               
>                                                                               
>                                                                               
>  
>       at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)   
>                                                                               
>                                                                               
>                                                                               
>  
>       at 
> org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
>        
> {code}
> The read reconciliation protocol requires each node to verify it has all 
> mutations that other nodes have. When a node discovers it's missing a 
> mutation, it registers a callback ("notify me when this mutation arrives") 
> and sends a pull request to fetch it. The callback decrements a counter, and 
> the read can only 
> complete when all counters reach zero.
> A race condition in the duplicate mutation handling causes the callback to 
> never fire, leaving the counter permanently stuck at 1 and the read hanging 
> until timeout.
> h2. The race
> Setup: Node2 is a summary node for a tracked read. Node3 is the data node.
> Node2 creates its mutation summary for the read. At this point, coordinator 
> log
> {2, 20} has offsets [0,0] — node2 knows about offset 0 only.
> A write happens on node2 (normal replication, unrelated to the read). This 
> write gets offset 1 in coordinator log \{2, 20}
> . The write goes through {{{}Keyspace.applyInternalTracked(){}}}: 
> {{startWriting()}} returns true (first time seeing this offset), the mutation 
> is applied to memtables, {{finishWriting()}} calls 
> {{invokeListeners(mutationId)}} — but nobody is listening yet, so it's a 
> no-op.
> Node3 sends its summary to node2. Node3's summary has [0,1] for log
> {2, 20} — it knows about both offsets 0 and 1.
> Node2 processes node3's summary in {{{}acceptRemoteSummary(){}}}. 
> {{collectLocallyMissingMutations()}} compares node3's summary against node2's 
> current local state. Even though the mutation arrived in step 2, there is a 
> window where the collection logic still identifies offset 1 as missing — 
> {{collectLocallyMissingMutations()}} takes a read lock on the CoordinatorLog 
> and computes the difference against {{{}witnessedOffsets{}}}, but this check 
> can race with the write in step 2. It finds offset 1 is missing, calls 
> {{registerMutationCallback()}} which succeeds (first listener — the earlier 
> {{invokeListeners}} found no listeners and was a no-op), increments the 
> remaining mutations counter by 1, and calls {{pull()}} which sends 
> {{PULL_MUTATIONS_REQ}} to node2 itself (since log \{2, 20}
> is node2's own coordinator log).
> Node2 receives its own pull request. {{PullMutationsRequest}} handler calls 
> {{{}requestMissingMutations(){}}}, which schedules the mutation for delivery 
> via the {{{}ActiveLogReconciler{}}}.
> The {{ActiveLogReconciler}} looks up the mutation in the journal (it's 
> there), reads it, and sends a {{PUSH_MUTATION_REQ}} back to node2.
> Node2 receives the pushed mutation. {{PushMutationRequest}} handler calls 
> {{{}mutation.applyFuture(){}}}, which goes through 
> {{{}Keyspace.applyInternalTracked(){}}}. 
> {{MutationTrackingService.startWriting()}} is called, but 
> {{CoordinatorLog.startWriting()}} checks {{witnessedOffsets}} and finds 
> offset 1 is already 
> witnessed from step 2 — returns false. Because {{started}} is false, 
> {{finishWriting()}} is never called, {{invokeListeners()}} is never called, 
> the callback registered in step 4 is never invoked, and the remaining 
> mutations counter stays at 1 forever.
> Node2 never completes reconciliation, so it never sends 
> {{READ_RECONCILE_ACK}} to node3. Node3 is waiting for node2's syncAck. It 
> never arrives. After 60 seconds, the client times out with 
> {{{}OperationTimedOutException{}}}.
> h2. The fix
> In {{{}MutationTrackingService.startWriting(){}}}, when the underlying 
> {{CoordinatorLog.startWriting()}} returns false (duplicate mutation already 
> witnessed), we now call 
> {{{}incomingMutations.invokeListeners(mutation.id()){}}}. This fires any 
> pending callbacks even for duplicate mutations. The callback fires, the 
> counter
> decrements to 0, the summary node completes reconciliation and sends its 
> syncAck, and the read succeeds.
> This is safe because {{startWriting()}} returning false means 
> {{finishWriting()}} already completed for this offset (it's the only code 
> path that adds to {{{}witnessedOffsets{}}}), so the mutation data is fully 
> written and visible.
> {code:java}
>   // MutationTrackingService.startWriting()
>   boolean started = 
> getOrCreateShards(mutation.getKeyspaceName()).startWriting(mutation);
>   // If this is a duplicate mutation (already witnessed), notify any pending 
> read
>   // reconciliation listeners. A listener can be registered between the first 
> write's
>   // invokeListeners() call (which found no listeners) and this duplicate's 
> arrival,
>   // causing the listener to never fire and the read to hang.
>   if (!started)
>       incomingMutations.invokeListeners(mutation.id());
>   return started;
>   {code}
> h2. Alternatives
> "check-after-register" in {{ReadReconciliations.pull()}} that would avoid the 
> unnecessary pull round-trip entirely: after registering the callback, 
> immediately check if the mutation is already in {{{}witnessedOffsets{}}}, and 
> if so fire the callback directly without sending a pull request.
> This follows the standard "register interest, then check if you missed the 
> event" pattern. However, this would require threading a new 
> {{isMutationWitnessed()}} method through {{{}MutationTrackingService{}}}, 
> {{{}Shard{}}}, and {{{}CoordinatorLog{}}}, adding complexity across multiple 
> layers. The {{startWriting()}} fix is
> simpler — a single check at the point where duplicates are already detected — 
> and is consistent with how {{finishWriting()}} and the transfer activation 
> path already call {{invokeListeners()}} under the same lock.
> It might also work to leverage the locking to make the processing of 
> {{collectLocallyMissingMutations()}} atomic with registration and 
> notification of listeners, but that wasn't as straightforward a fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to