[ 
https://issues.apache.org/jira/browse/CASSANDRA-21181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ariel Weisberg updated CASSANDRA-21181:
---------------------------------------
    Description: ShortPaxosTrackingSimulationTest fails with  (was: Still 
validating these bugs and the fixes so I can't vouch for the accuracy of this 
description yet. It's taking a minute to learn how reconciliation works in both 
the foreground and background and how they interact at this level of detail.

  ---                                                                           
                                                                                
                                                                                
                                                                             
  h2. Summary                                                                   
                                                                                
                                                                                
                                                                             
                                                                                
                                                                                
                                                                                
                                                                             
  Four bugs in the read reconciliation protocol of the mutation tracking system 
cause failures during tracked reads. Bugs 1 and 2 cause read timeouts (observed 
in {{MultiNodeTableWalkWithWitnessesTest}}). Bugs 3 and 4 cause {{Missing 
mutation}} crashes (observed in {{ShortPaxosTrackingSimulationTest}}).          
   

  h2. Background: Read Reconciliation Protocol

  When a tracked read executes, the data node and all summary nodes exchange 
mutation summaries. The data node's {{ReadReconciliations.Coordinator}} 
maintains a packed atomic counter with three components: remaining mutations, 
remaining summaries, and remaining sync-acks. The read can only complete 
(calling
  {{augmentingOffsets()}} and {{acknowledgeReconcile}}) when all three reach 
zero.

  For each remote summary received via {{acceptRemoteSummary()}}, the protocol:
  Calls {{collectLocallyMissingMutations()}} to identify mutations in the 
remote summary that the local node doesn't have

  Deduplicates against previously requested mutations via the per-read 
{{requested}} set

  Registers a callback for each missing mutation via 
{{registerMutationCallback()}} (which calls {{IncomingMutations.subscribe()}})

  Sends {{PULL_MUTATIONS_REQ}} to fetch the missing mutations

  Increments the remaining-mutations counter by the number of missing mutations

  When a pulled mutation arrives, it's applied via 
{{Keyspace.applyInternalTracked()}}, which journals it, applies it to 
memtables, and calls {{finishWriting()}} which invokes {{invokeListeners()}} to 
fire the callback. The callback calls {{acceptMutation()}}, which decrements 
the remaining-mutations counter by 1.

  After completion, {{augmentingOffsets()}} recomputes the difference between 
remote summaries and the local summary. It reads each of these mutations from 
the local journal to augment the read result.

  h2. Bug 1: Duplicate mutation silently drops listener notification

  h3. Symptom

  Tracked reads time out because the remaining-mutations counter never reaches 
zero.

  h3. The race

  {noformat}
  Timeline on the data node:

  1. collectLocallyMissingMutations() identifies offset X as missing.
  (offset X is NOT in witnessedOffsets[localNode])
  2. The mutation carrying offset X arrives via normal replication
  (unrelated to the pull — e.g. a delayed original write).
  In Keyspace.applyInternalTracked():
    - trackedWriteHandler.beginWrite() → journals the mutation
    - startWriting() → returns true (first time seeing this offset)
    - Mutation applied to memtables
    - finishWriting() → adds to witnessedOffsets, calls invokeListeners(X)
    - But no listener is registered yet — invokeListeners() is a no-op
  3. registerMutationCallback(X, callback) → returns true (first listener).
  The callback IS registered, but nobody will ever invoke it because
  invokeListeners() already ran in step 2 and found no listeners.
  4. PULL_MUTATIONS_REQ is sent. The pull response arrives carrying offset X.
  In Keyspace.applyInternalTracked():
    - trackedWriteHandler.beginWrite() → journals (duplicate write to journal)
    - startWriting() → returns FALSE (already in witnessedOffsets from step 2)
    - The if(started) block is skipped entirely
    - finishWriting() is NOT called
    - invokeListeners() is NOT called
    - The callback from step 3 is never invoked
    - The remaining-mutations counter is never decremented
  5. The read hangs until timeout.
  {noformat}

  h3. The fix

  In {{Keyspace.applyInternalTracked()}}, when {{startWriting()}} returns false 
(duplicate mutation), call 
{{MutationTrackingService.instance.notifyIncomingMutationListeners(mutation.id())}}.
 This invokes any pending callbacks even for duplicate mutations, closing the 
race window where a callback is registered
  between the first write's {{invokeListeners()}} and the duplicate's arrival.

  {code:java}
  // Keyspace.java - applyInternalTracked()
  if (started)
      MutationTrackingService.instance.finishWriting(mutation);
  else
      // Duplicate mutation - still notify listeners in case a read 
reconciliation
      // registered a callback after the first write completed but before this 
duplicate arrived
      
MutationTrackingService.instance.notifyIncomingMutationListeners(mutation.id());
  {code}

  New method added to {{MutationTrackingService}}:
  {code:java}
  public void notifyIncomingMutationListeners(ShortMutationId mutationId)
  {
      incomingMutations.invokeListeners(mutationId);
  }
  {code}

  h2. Bug 2: Overcounting remaining mutations

  h3. Symptom

  Tracked reads time out because the remaining-mutations counter never reaches 
zero.

  h3. The problem

  In {{acceptRemoteSummary()}}, the remaining-mutations counter was incremented 
by {{missingMutations.idCount()}} — the total number of mutations identified as 
missing. But {{registerMutationCallback()}} can fail to register for some of 
them because the mutation arrives via normal replication in the window between
  {{collectLocallyMissingMutations()}} and {{registerMutationCallback()}}. When 
this happens:

  {{collectLocallyMissingMutations()}} identifies the offset as missing (not 
yet in {{witnessedOffsets}})

  The mutation arrives via normal replication, {{finishWriting()}} calls 
{{invokeListeners()}} — but no listener is registered yet, so it's a no-op

  {{registerMutationCallback()}} is called — it returns true (first listener, 
since {{invokeListeners()}} in step 2 found no entry to remove)

  The callback is registered, but nobody will invoke it (the mutation already 
arrived and {{invokeListeners}} already ran)

  The counter was incremented for this mutation, but neither the Bug 1 path 
(duplicate arrival) nor the normal path will fire the callback

  Note: this bug is closely related to Bug 1. Bug 1 is the more fundamental fix 
— it ensures duplicate mutations still fire callbacks. Bug 2 is a 
belt-and-suspenders fix for the related race where the mutation arrives before 
the callback is even registered, so there's no callback to fire at all.

  h3. The fix

  Changed {{acceptRemoteSummary()}} to increment the counter by the actual 
number of callbacks registered via {{pull()}}, rather than by 
{{missingMutations.idCount()}}. The {{pull()}} method now returns the count of 
mutations it registered callbacks for.

  {code:java}
  // ReadReconciliations.Coordinator.acceptRemoteSummary()
  int actuallyRequested = 0;
  for (Offsets offsets : missingMutations.offsets())
      actuallyRequested += pull(remoteNode, offsets, callback);

  summaries.add(Pair.create(remoteNode, summary));
  return updateRemainingAndMaybeComplete(actuallyRequested, -1, 0);
  {code}

  h2. Bug 3: collectLocallyMissingMutations only checks unreconciled offsets

  h3. Symptom

  Tracked reads crash with {{Missing mutation}} because {{augmentingOffsets()}} 
includes a mutation that was never pulled or journaled on the data node.

  h3. The problem

  {{collectLocallyMissingMutations()}} iterates 
{{remoteSummary.onlyUnreconciled()}}, which yields only the unreconciled 
offsets from the remote summary. But {{augmentingOffsets()}} includes BOTH 
reconciled and unreconciled offsets from remote summaries minus the local 
summary. This is intentional in
  {{augmentingOffsets()}} to prevent read monotonicity violations (documented 
in the method's javadoc).

  A mutation being "reconciled" on a remote coordinator means the coordinator 
received acknowledgments from enough replicas — it does NOT mean the local 
(data) node has the mutation. When a mutation is reconciled on the remote 
coordinator but the data node never received it (e.g., the write to this node 
was dropped
  by the network), the pull logic skips it (it's not in 
{{onlyUnreconciled()}}), but {{augmentingOffsets()}} includes it.

  {noformat}
  1. Node 3 coordinates a write with offset 0. All replicas acknowledge.
  Offset 0 moves to reconciledOffsets on node 3's coordinator log.
  2. Node 1 (data node) never received the write (network drop).
  3. A tracked read starts on node 1 with node 3 as a summary node.
  4. Node 3's summary: CoordinatorLogId{3, 2} reconciled={[0,0]}, 
unreconciled={}
  5. Node 1's local summary: CoordinatorLogId{3, 2} not present (never received 
offset 0)
  6. acceptRemoteSummary() on node 1 calls collectLocallyMissingMutations()
  which iterates onlyUnreconciled() → node 3's unreconciled is empty →
  nothing identified as missing → no pull request sent
  7. All counters reach zero (no mutations were requested). complete() is 
called.
  8. augmentingOffsets() = union(remote summaries) - local summary
  = {reconciled [0,0]} - {} = {[0,0]}
  Offset 0 IS included.
  9. augment(ShortMutationId{3, 2, 0}) → MutationJournal.read() → null
  Crash: "Missing mutation ShortMutationId{3, 2, 0}"
  {noformat}

  h3. The fix

  Added {{MutationSummary.allOffsets()}} which returns an iterator over the 
union of reconciled and unreconciled offsets for each coordinator log. Changed 
{{collectLocallyMissingMutations()}} to use {{allOffsets()}} instead of 
{{onlyUnreconciled()}}.

  {code:java}
  // MutationSummary.java
  public Iterator allOffsets()
  {
      return new AbstractIterator<>()
      {
          int i = 0;

      @Override
      protected Offsets computeNext()
      {
          while (i < summaries.size())
          {
              CoordinatorSummary summary = summaries.get(i++);
              Offsets combined = Offsets.Immutable.union(summary.reconciled, 
summary.unreconciled);
              if (combined != null && !combined.isEmpty())
                  return combined;
          }
          return endOfData();
      }
  };
  }
  {code}

  {code:java}
  // MutationTrackingService.collectLocallyMissingMutations()
  // Use allOffsets() instead of onlyUnreconciled() because a mutation being 
reconciled
  // on the remote coordinator doesn't mean the local node has received it.
  Iterator iterator = remoteSummary.allOffsets();
  {code}

  h2. Bug 4: Cross-read callback sharing causes undercounting of remaining 
mutations

  h3. Symptom

  Tracked reads crash with {{Missing mutation}} because the remaining-mutations 
counter reaches zero prematurely — before a mutation that appears in 
{{augmentingOffsets()}} has been pulled and journaled.

  h3. The problem

  The {{IncomingMutations}} listener map is global per node, not per-read. When 
two concurrent reads both identify the same mutation as missing, they both call 
{{registerMutationCallback()}} which calls {{IncomingMutations.subscribe()}}. 
The first call returns true (first listener) and the second returns false
  (listener already exists). Both callbacks are added to the listener list — 
when the mutation arrives, {{invokeListeners()}} fires both.

  The per-read {{requested}} set deduplication does not help here because it is 
per-{{Coordinator}} (per-read). Two different reads have independent 
{{requested}} sets.

  However, {{pull()}} only counted mutations where 
{{registerMutationCallback()}} returned true. When it returned false (another 
read already registered a listener), the mutation was not counted toward the 
remaining-mutations counter. But the callback WAS registered and WILL fire when 
the mutation arrives
  (decrementing the counter). This creates an asymmetry: the callback fires but 
the counter was never incremented for it.

  {noformat}
  Timeline on the data node with two concurrent tracked reads:

  1. Read A's acceptRemoteSummary() identifies mutation X as missing.
  registerMutationCallback(X, callbackA) → returns true (first listener).
  pull() sends PULL_MUTATIONS_REQ and counts 1.
  Read A's remaining-mutations counter incremented by 1.
  2. Read B's acceptRemoteSummary() identifies mutation X as missing.
  registerMutationCallback(X, callbackB) → returns false (Read A's listener 
exists).
  pull() does NOT count this mutation (only counted first-listener 
registrations).
  Read B's remaining-mutations counter incremented by 0.
  3. Read B's remaining counters all reach zero (mutations was never 
incremented).
  complete() is called on Read B.
  augmentingOffsets() includes mutation X (it's in remote summaries but not 
local).
  augment() tries to read X from journal → NOT_FOUND (never pulled for Read B).
  Crash: "Missing mutation"
  {noformat}

  h3. The fix

  {{pull()}} now counts ALL mutations for which callbacks are registered, not 
just those where {{registerMutationCallback()}} returned true. The pull request 
is still only sent for first-listener registrations (since another read already 
sent one), but the remaining counter is incremented for every callback that
  will eventually fire.

  {code:java}
  private static int pull(int node, Offsets offsets, IncomingMutations.Callback 
callback)
  {
      InetAddressAndPort logCoordinator = host(offsets.logId().hostId());
      InetAddressAndPort pullFrom = 
FailureDetector.instance.isAlive(logCoordinator)
                                  ? logCoordinator
                                  : host(node);

  Offsets.Mutable toPull = new Offsets.Mutable(offsets.logId());
  int callbackCount = 0;
  for (ShortMutationId id : offsets)
  {
      // registerMutationCallback always adds the callback to the listener list.
      // It returns true if this is the first listener (meaning we need to send 
a pull request).
      // Even when it returns false (another read already registered a 
listener), our callback
      // is still added and will fire when the mutation arrives via 
invokeListeners.
      boolean firstListener = 
MutationTrackingService.instance.registerMutationCallback(id, callback);
      callbackCount++;
      if (firstListener)
          toPull.add(id.offset());
  }

  if (!toPull.isEmpty())
  {
      PullMutationsRequest pull = new 
PullMutationsRequest(Offsets.Immutable.copy(toPull));
      MessagingService.instance().send(Message.out(Verb.PULL_MUTATIONS_REQ, 
pull), pullFrom);
  }

  return callbackCount;
  }
  {code})

> Fix ShortPaxosTrackingSimulationTest and MultiNodeTableWalkWithWitnessesTest
> ----------------------------------------------------------------------------
>
>                 Key: CASSANDRA-21181
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-21181
>             Project: Apache Cassandra
>          Issue Type: Sub-task
>          Components: Consistency/Coordination
>            Reporter: Ariel Weisberg
>            Assignee: Ariel Weisberg
>            Priority: Normal
>
> ShortPaxosTrackingSimulationTest fails with



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to