Ariel Weisberg created CASSANDRA-21181:
------------------------------------------
Summary: Fix ShortPaxosTrackingSimulationTest and
MultiNodeTableWalkWithWitnessesTest
Key: CASSANDRA-21181
URL: https://issues.apache.org/jira/browse/CASSANDRA-21181
Project: Apache Cassandra
Issue Type: Sub-task
Components: Consistency/Coordination
Reporter: Ariel Weisberg
Assignee: Ariel Weisberg
Still validating these bugs and the fixes so I can't vouch for the accuracy of
this description yet. It's taking a minute to learn how reconciliation works in
both the foreground and background and how they interact at this level of
detail.
---
h2. Summary
Four bugs in the read reconciliation protocol of the mutation tracking system
cause failures during tracked reads. Bugs 1 and 2 cause read timeouts (observed
in {{MultiNodeTableWalkWithWitnessesTest}}). Bugs 3 and 4 cause {{Missing
mutation}} crashes (observed in {{ShortPaxosTrackingSimulationTest}}).
h2. Background: Read Reconciliation Protocol
When a tracked read executes, the data node and all summary nodes exchange
mutation summaries. The data node's {{ReadReconciliations.Coordinator}}
maintains a packed atomic counter with three components: remaining mutations,
remaining summaries, and remaining sync-acks. The read can only complete
(calling
{{augmentingOffsets()}} and {{acknowledgeReconcile}}) when all three reach
zero.
For each remote summary received via {{acceptRemoteSummary()}}, the protocol:
Calls {{collectLocallyMissingMutations()}} to identify mutations in the
remote summary that the local node doesn't have
Deduplicates against previously requested mutations via the per-read
{{requested}} set
Registers a callback for each missing mutation via
{{registerMutationCallback()}} (which calls {{IncomingMutations.subscribe()}})
Sends {{PULL_MUTATIONS_REQ}} to fetch the missing mutations
Increments the remaining-mutations counter by the number of missing mutations
When a pulled mutation arrives, it's applied via
{{Keyspace.applyInternalTracked()}}, which journals it, applies it to
memtables, and calls {{finishWriting()}} which invokes {{invokeListeners()}} to
fire the callback. The callback calls {{acceptMutation()}}, which decrements
the remaining-mutations counter by 1.
After completion, {{augmentingOffsets()}} recomputes the difference between
remote summaries and the local summary. It reads each of these mutations from
the local journal to augment the read result.
h2. Bug 1: Duplicate mutation silently drops listener notification
h3. Symptom
Tracked reads time out because the remaining-mutations counter never reaches
zero.
h3. The race
{noformat}
Timeline on the data node:
1. collectLocallyMissingMutations() identifies offset X as missing.
(offset X is NOT in witnessedOffsets[localNode])
2. The mutation carrying offset X arrives via normal replication
(unrelated to the pull — e.g. a delayed original write).
In Keyspace.applyInternalTracked():
- trackedWriteHandler.beginWrite() → journals the mutation
- startWriting() → returns true (first time seeing this offset)
- Mutation applied to memtables
- finishWriting() → adds to witnessedOffsets, calls invokeListeners(X)
- But no listener is registered yet — invokeListeners() is a no-op
3. registerMutationCallback(X, callback) → returns true (first listener).
The callback IS registered, but nobody will ever invoke it because
invokeListeners() already ran in step 2 and found no listeners.
4. PULL_MUTATIONS_REQ is sent. The pull response arrives carrying offset X.
In Keyspace.applyInternalTracked():
- trackedWriteHandler.beginWrite() → journals (duplicate write to journal)
- startWriting() → returns FALSE (already in witnessedOffsets from step 2)
- The if(started) block is skipped entirely
- finishWriting() is NOT called
- invokeListeners() is NOT called
- The callback from step 3 is never invoked
- The remaining-mutations counter is never decremented
5. The read hangs until timeout.
{noformat}
h3. The fix
In {{Keyspace.applyInternalTracked()}}, when {{startWriting()}} returns false
(duplicate mutation), call
{{MutationTrackingService.instance.notifyIncomingMutationListeners(mutation.id())}}.
This invokes any pending callbacks even for duplicate mutations, closing the
race window where a callback is registered
between the first write's {{invokeListeners()}} and the duplicate's arrival.
{code:java}
// Keyspace.java - applyInternalTracked()
if (started)
MutationTrackingService.instance.finishWriting(mutation);
else
// Duplicate mutation - still notify listeners in case a read
reconciliation
// registered a callback after the first write completed but before this
duplicate arrived
MutationTrackingService.instance.notifyIncomingMutationListeners(mutation.id());
{code}
New method added to {{MutationTrackingService}}:
{code:java}
public void notifyIncomingMutationListeners(ShortMutationId mutationId)
{
incomingMutations.invokeListeners(mutationId);
}
{code}
h2. Bug 2: Overcounting remaining mutations
h3. Symptom
Tracked reads time out because the remaining-mutations counter never reaches
zero.
h3. The problem
In {{acceptRemoteSummary()}}, the remaining-mutations counter was incremented
by {{missingMutations.idCount()}} — the total number of mutations identified as
missing. But {{registerMutationCallback()}} can fail to register for some of
them because the mutation arrives via normal replication in the window between
{{collectLocallyMissingMutations()}} and {{registerMutationCallback()}}. When
this happens:
{{collectLocallyMissingMutations()}} identifies the offset as missing (not
yet in {{witnessedOffsets}})
The mutation arrives via normal replication, {{finishWriting()}} calls
{{invokeListeners()}} — but no listener is registered yet, so it's a no-op
{{registerMutationCallback()}} is called — it returns true (first listener,
since {{invokeListeners()}} in step 2 found no entry to remove)
The callback is registered, but nobody will invoke it (the mutation already
arrived and {{invokeListeners}} already ran)
The counter was incremented for this mutation, but neither the Bug 1 path
(duplicate arrival) nor the normal path will fire the callback
Note: this bug is closely related to Bug 1. Bug 1 is the more fundamental fix
— it ensures duplicate mutations still fire callbacks. Bug 2 is a
belt-and-suspenders fix for the related race where the mutation arrives before
the callback is even registered, so there's no callback to fire at all.
h3. The fix
Changed {{acceptRemoteSummary()}} to increment the counter by the actual
number of callbacks registered via {{pull()}}, rather than by
{{missingMutations.idCount()}}. The {{pull()}} method now returns the count of
mutations it registered callbacks for.
{code:java}
// ReadReconciliations.Coordinator.acceptRemoteSummary()
int actuallyRequested = 0;
for (Offsets offsets : missingMutations.offsets())
actuallyRequested += pull(remoteNode, offsets, callback);
summaries.add(Pair.create(remoteNode, summary));
return updateRemainingAndMaybeComplete(actuallyRequested, -1, 0);
{code}
h2. Bug 3: collectLocallyMissingMutations only checks unreconciled offsets
h3. Symptom
Tracked reads crash with {{Missing mutation}} because {{augmentingOffsets()}}
includes a mutation that was never pulled or journaled on the data node.
h3. The problem
{{collectLocallyMissingMutations()}} iterates
{{remoteSummary.onlyUnreconciled()}}, which yields only the unreconciled
offsets from the remote summary. But {{augmentingOffsets()}} includes BOTH
reconciled and unreconciled offsets from remote summaries minus the local
summary. This is intentional in
{{augmentingOffsets()}} to prevent read monotonicity violations (documented
in the method's javadoc).
A mutation being "reconciled" on a remote coordinator means the coordinator
received acknowledgments from enough replicas — it does NOT mean the local
(data) node has the mutation. When a mutation is reconciled on the remote
coordinator but the data node never received it (e.g., the write to this node
was dropped
by the network), the pull logic skips it (it's not in
{{onlyUnreconciled()}}), but {{augmentingOffsets()}} includes it.
{noformat}
1. Node 3 coordinates a write with offset 0. All replicas acknowledge.
Offset 0 moves to reconciledOffsets on node 3's coordinator log.
2. Node 1 (data node) never received the write (network drop).
3. A tracked read starts on node 1 with node 3 as a summary node.
4. Node 3's summary: CoordinatorLogId{3, 2} reconciled={[0,0]},
unreconciled={}
5. Node 1's local summary: CoordinatorLogId{3, 2} not present (never received
offset 0)
6. acceptRemoteSummary() on node 1 calls collectLocallyMissingMutations()
which iterates onlyUnreconciled() → node 3's unreconciled is empty →
nothing identified as missing → no pull request sent
7. All counters reach zero (no mutations were requested). complete() is
called.
8. augmentingOffsets() = union(remote summaries) - local summary
= {reconciled [0,0]} - {} = {[0,0]}
Offset 0 IS included.
9. augment(ShortMutationId{3, 2, 0}) → MutationJournal.read() → null
Crash: "Missing mutation ShortMutationId{3, 2, 0}"
{noformat}
h3. The fix
Added {{MutationSummary.allOffsets()}} which returns an iterator over the
union of reconciled and unreconciled offsets for each coordinator log. Changed
{{collectLocallyMissingMutations()}} to use {{allOffsets()}} instead of
{{onlyUnreconciled()}}.
{code:java}
// MutationSummary.java
public Iterator allOffsets()
{
return new AbstractIterator<>()
{
int i = 0;
@Override
protected Offsets computeNext()
{
while (i < summaries.size())
{
CoordinatorSummary summary = summaries.get(i++);
Offsets combined = Offsets.Immutable.union(summary.reconciled,
summary.unreconciled);
if (combined != null && !combined.isEmpty())
return combined;
}
return endOfData();
}
};
}
{code}
{code:java}
// MutationTrackingService.collectLocallyMissingMutations()
// Use allOffsets() instead of onlyUnreconciled() because a mutation being
reconciled
// on the remote coordinator doesn't mean the local node has received it.
Iterator iterator = remoteSummary.allOffsets();
{code}
h2. Bug 4: Cross-read callback sharing causes undercounting of remaining
mutations
h3. Symptom
Tracked reads crash with {{Missing mutation}} because the remaining-mutations
counter reaches zero prematurely — before a mutation that appears in
{{augmentingOffsets()}} has been pulled and journaled.
h3. The problem
The {{IncomingMutations}} listener map is global per node, not per-read. When
two concurrent reads both identify the same mutation as missing, they both call
{{registerMutationCallback()}} which calls {{IncomingMutations.subscribe()}}.
The first call returns true (first listener) and the second returns false
(listener already exists). Both callbacks are added to the listener list —
when the mutation arrives, {{invokeListeners()}} fires both.
The per-read {{requested}} set deduplication does not help here because it is
per-{{Coordinator}} (per-read). Two different reads have independent
{{requested}} sets.
However, {{pull()}} only counted mutations where
{{registerMutationCallback()}} returned true. When it returned false (another
read already registered a listener), the mutation was not counted toward the
remaining-mutations counter. But the callback WAS registered and WILL fire when
the mutation arrives
(decrementing the counter). This creates an asymmetry: the callback fires but
the counter was never incremented for it.
{noformat}
Timeline on the data node with two concurrent tracked reads:
1. Read A's acceptRemoteSummary() identifies mutation X as missing.
registerMutationCallback(X, callbackA) → returns true (first listener).
pull() sends PULL_MUTATIONS_REQ and counts 1.
Read A's remaining-mutations counter incremented by 1.
2. Read B's acceptRemoteSummary() identifies mutation X as missing.
registerMutationCallback(X, callbackB) → returns false (Read A's listener
exists).
pull() does NOT count this mutation (only counted first-listener
registrations).
Read B's remaining-mutations counter incremented by 0.
3. Read B's remaining counters all reach zero (mutations was never
incremented).
complete() is called on Read B.
augmentingOffsets() includes mutation X (it's in remote summaries but not
local).
augment() tries to read X from journal → NOT_FOUND (never pulled for Read B).
Crash: "Missing mutation"
{noformat}
h3. The fix
{{pull()}} now counts ALL mutations for which callbacks are registered, not
just those where {{registerMutationCallback()}} returned true. The pull request
is still only sent for first-listener registrations (since another read already
sent one), but the remaining counter is incremented for every callback that
will eventually fire.
{code:java}
private static int pull(int node, Offsets offsets, IncomingMutations.Callback
callback)
{
InetAddressAndPort logCoordinator = host(offsets.logId().hostId());
InetAddressAndPort pullFrom =
FailureDetector.instance.isAlive(logCoordinator)
? logCoordinator
: host(node);
Offsets.Mutable toPull = new Offsets.Mutable(offsets.logId());
int callbackCount = 0;
for (ShortMutationId id : offsets)
{
// registerMutationCallback always adds the callback to the listener list.
// It returns true if this is the first listener (meaning we need to send
a pull request).
// Even when it returns false (another read already registered a
listener), our callback
// is still added and will fire when the mutation arrives via
invokeListeners.
boolean firstListener =
MutationTrackingService.instance.registerMutationCallback(id, callback);
callbackCount++;
if (firstListener)
toPull.add(id.offset());
}
if (!toPull.isEmpty())
{
PullMutationsRequest pull = new
PullMutationsRequest(Offsets.Immutable.copy(toPull));
MessagingService.instance().send(Message.out(Verb.PULL_MUTATIONS_REQ,
pull), pullFrom);
}
return callbackCount;
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]