[
https://issues.apache.org/jira/browse/CASSANDRA-16909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412879#comment-17412879
]
David Capwell commented on CASSANDRA-16909:
-------------------------------------------
Preview and IR updated
> ☂ Medium Term Repair Improvements
> ---------------------------------
>
> Key: CASSANDRA-16909
> URL: https://issues.apache.org/jira/browse/CASSANDRA-16909
> Project: Cassandra
> Issue Type: Epic
> Components: Consistency/Repair
> Reporter: David Capwell
> Assignee: David Capwell
> Priority: Normal
>
> This is to track the repair improvement works defined on the dev list.
> Email was: [DISCUSS] Repair Improvement Proposal
> This JIRA will track the related tasks and be used for documentaiton
> {code}
> Repair Coordinator State
> 1. ActiveRepairService.recordRepairStatus
> 1. Maps JMX command (int) to (status, msg)
> 2. Get/validate ColumnFamilies
> 1. Skip if no ColumnFamilies to repair
> 3. Get replicas for ranges
> 1. Ignore ranges not covered by this instance IFF
> --ignore-unreplicated-keyspaces, else Fail
> 2. Skip if no ranges to repair
> 3. If --force filter out non-alive (FailureDetector.isAlive) participants
> 4. [Not PREVIEW]Update SystemDistributedKeyspace's parent_repair_history
> 5. ActiveRepairService.prepareForRepair
> 1. TODO Should this be under PREPARE or still part of validation?
> 1. If CompactionsPendingThreshold triggered, Fail
> 2. registerParentRepairSession - update map of UUID ->
> ParentRepairSession
> 2. Send PREPARE_MSG awaiting responses [possible failures: timeout
> waiting, participate failure]
> 1. [improvement] PREPARE_MSG should be idempotent and if no reply
> within X, retry Y times
> 2. Known Failures; all are retryable at this stage
> 1. Timeout
> 2. Participate goes from alive to down
> 3. CompactionsPendingThreshold triggered
> 1. Not included in
> org.apache.cassandra.exceptions.RequestFailureReason#forException, so looks
> the same as Unexpected error
> 1. If updated and in mixed-mode, the update falls back to
> UNKNOWN, which then matches the unexpected error behavior
> 4. Unexpected error (this removes the session)
> 6. Run RepairTask (normal, IR, preview); see coordinator state for each type
> 7. On Success
> 1. Update SystemDistributedKeyspace.parent_repair_history to show the
> successful ranges
> 2. If any sub-session failed, fail the job
> 3. ActiveRepairService.cleanUp - send CLEANUP_MSG to all participates to
> clean up
> 1. TODO: why is this only on success and not failures as well?
> 2. [improvement] - does not wait for ACK (though it is sent), we log
> it async if we get it (else timeout)
> 8. On Exception
> 1. fail
> Normal/Preview Repair Coordinator State
> 1. For each common range
> 1. ActiveRepairService.submitRepairSession
> 1. Creates/run a RepairSession for each CommonRange
> 2. Once all RepairSessions done
> 1. [not consistent cross each type] handle session errors
> 2. [Preview Repair] check SyncStatSummary for difference and send a
> human readable notification
> Incremental Repair Coordinator State
> 1. org.apache.cassandra.repair.consistent.CoordinatorSessions#registerSession
> to create CoordinatorSession
> 2. CoordinatorSession.execute
> 1. Send PREPARE_CONSISTENT_REQ and wait for PREPARE_CONSISTENT_RSP
> 2. Await all success
> 3. Trigger Normal Repair (see "Normal/Preview Repair Coordinator State")
> 4. Send FINALIZE_PROPOSE_MSG and wait for FINALIZE_PROMISE_MSG
> 5. Await all success
> 6. Send FINALIZE_COMMIT_MSG
> 1. [bug][improvement] No ACK is done, so if this message is dropped
> then some participates will think the IR is still running (which can fail
> preview repairs)
> RepairSession
> 1. [Preview Repair - kind=REPAIRED] register with LocalSessions for IR state
> changes
> 2. RepairSession.start
> 1. [Not Preview Repair] Registering the session into
> SystemDistributedKeyspace's table repair_history
> 2. If endpoints is empty
> 1. [UNHANDLED - downstream logic does not handle this case] Set
> future with empty state (which is later seen as Failed... but not a failed
> future)
> 2. [Not Preview Repair] Mark session failed in repair_history
> 3. Check all endpoints, if any is down and hasSkippedReplicas=false, Fail
> the session
> 4. For each table
> 1. Create a RepairJob
> 2. Execute job in RepairTask's executor
> 3. await all jobs
> 1. If all success
> 1. Set session result to include the job results
> 2. If any fail
> 1. Fail the session future
> 2. [Question] why does this NOT update repair_history like
> other failures?
> RepairJob
> 1. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] send
> SNAPSHOT_MSG to all participates
> 1. [improvement] SNAPSHOT_MSG should be idempotent so coordinator can
> retry if no ACK. This calls
> org.apache.cassandra.repair.TableRepairManager#snapshot with the
> RepairRunnable ID; this makes sure to snapshot once unless force=true
> (RepairOptions !(dataCenters.isEmpty and hosts.isEmpty()))
> 2. [improvement] This task is short lived, so rather than running in a
> cached pool (which may allocate a thread), inline or add a notation of
> cooperative sharing if retries are implemented
> 2. Await snapshot success
> 3. Send VALIDATION_REQ to all participates (all at once, or batched based off
> parallelism)
> 1. [bug] MerkleTree may be large, even if off-heap; this can cause the
> coordinator to OOM (heap or direct); there is no bounds to the number of
> MerkleTree which may be in-flight
> 2. [improvement] VALIDATION_REQ could be made idempotent. Right now we
> create a Validator and submit to ValidationManager, but could dedupe based
> off session id
> 4. Await validation success
> 5. Stream any-all conflicting ranges (2 modes: optimiseStreams && not
> pullRepair = optimisedSyncing, else standardSyncing)
> 1. Create a SyncTask (Local, Asymmetric, Symmetric) for each conflicting
> range
> 1. Local: create stream plan and use streaming
> 2. Asymmetric/Symmetric: send SYNC_REQ
> 1. [bug][improvement] No ACK is done, so if this message is
> dropped streaming does not start on the participate
> 2. [improvement] Both AsymmetricRemoteSyncTask, and
> SymmetricRemoteSyncTask are the same class; they are copy/paste clones of
> each other; the only difference is AsymmetricRemoteSyncTask creates the
> SyncRequest with asymmetric=true
> 3. [improvement] Can be idempotent (when remote); currently just
> starts streaming right away, would need to dedup on the session
> 6. Await streaming complete
> 7. onSuccess
> 1. [not preview repair] update repair_history, marking the session success
> 2. Set the future as success
> 8. onFailure
> 1. Abort validation tasks
> 2. [not preview repair] update repair_history, marking the session failed
> 3. Set the future to a failure
> Repair Participate State
> 1. Receive PREPARE_MSG
> 1. [improvement] PREPARE_MSG should be idempotent
> 1. [current state] If parentRepairSessions contains the session, it
> ignores the request and noops; but does NOT validate that the sessions match
> 2. [current state] mostly idempotent, assuming
> CompactionsPendingThreshold does not trigger
> 2. [IR] Receive PREPARE_CONSISTENT_REQ
> 1. Create LocalSession in-memory
> 2. Persist LocalSession to system.repairs
> 3. Create new Executor; 1 thread per table
> 1. [improvement] the thread waits for
> org.apache.cassandra.db.repair.PendingAntiCompaction.AcquisitionCallable#acquireTuple
> to get called and rerun SSTables from Tracker
> 2. [improvement] causes the active compactions (not Validations) to
> get interrupted, so if its running for a long time and generated a ton of
> garbage... it was done in waste
> 3. [improvement]
> org.apache.cassandra.db.ColumnFamilyStore#runWithCompactionsDisabled is on a
> single table level but could be extended for multiple tables, this would
> allow only blocking on 1 thread as acquireTuple is just mutating
> org.apache.cassandra.db.lifecycle.Tracker
> 4. Run PendingAntiCompaction
> 1. Block/Interrupt compactions, to collect SSTables to AntiCompact
> 2. Trigger
> org.apache.cassandra.db.compaction.CompactionManager#submitPendingAntiCompaction
> for each table
> 3. Calls
> org.apache.cassandra.db.compaction.CompactionManager#performAnticompaction
> 1. Sets UNREPAIRED_SSTABLE for each SSTable
> 2. Group SStables
> 3. For each group, rewrite SSTable into 3 SSTables: full
> (repaired), transient (repaired), other (unprepared; out of range)
> 4. On Success
> 1. Update system.repairs
> 2. Notify listeners (aka RepairSession)
> 3. Send PREPARE_CONSISTENT_RSP with body=true
> 5. On Failure
> 1. Send PREPARE_CONSISTENT_RSP with body=false
> 2. Update system.repairs
> 3. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] Receive
> SNAPSHOT_MSG
> 1. Create a table snapshot using the RepairRunnable ID. If
> !RepairOption.isGlobal, then override the snapshot if present
> 4. Receive VALIDATION_REQ
> 1. Creates a Validator and submits to CompactionManager's
> validationExecutor
> 2. Core logic: org.apache.cassandra.repair.ValidationManager#doValidation
> 3. Iterate over each partition/row, updating a MerkleTree
> 4. When done, switch to the ANTI_ENTROPY stage
> 5. If coordinator is remote
> 1. Send a VALIDATION_RSP back with the MerkleTree (or null if failed)
> 6. Else
> 1. Switch to ANTI_ENTROPY again
> 2. Attempt to move MerkleTree off-heap
> 3. Forward message to ActiveRepairService.handleMessage
> 5. [SyncTask in [AsymmetricRemoteSyncTask, SymmetricRemoteSyncTask]] Receive
> SYNC_REQ
> 1. Creates a stream plan and use streaming (StreamingRepairTask)
> 6. [IR] Receive FINALIZE_PROPOSE_MSG
> 1. Update system.repairs
> 2. Force flush table
> 7. [IR] Receive FINALIZE_COMMIT_MSG
> 1. Update system.repairs
> 8. Receive CLEANUP_MSG
> 1. Removes in-memory state and disk snapshots
> LocalSessions (IR)
> * On start, load state from system.repairs; mark active repairs as failed and
> sent FAILED_SESSION_MSG to coordinato
> * [every 10m; controlled by -Dcassandra.repair_cleanup_interval_seconds] for
> each LocalSession
> * If not updated within 1D (controlled by
> -Dcassandra.repair_fail_timeout_seconds), mark repair failed
> * Else if not updated within 1D (controlled by
> -Dcassandra.repair_delete_timeout_seconds)
> * Delete in-memory state
> * Delete system.repairs
> * Else if not updated within 1H (controlled by
> -Dcassandra.repair_status_check_timeout_seconds), request status update from
> all participants (get ConsistentSession.State)
> * Update system.repairs if any participants have state FINALIZED or
> FAILED
> * [bug][improvement] if Validation or Streaming is still running,
> this just updates the state but does not terminate active tasks
> * [improvement] if all participates are still running, we will check
> again in 10m
> Streaming
> * Streaming has docs on how it works, so this section just shows some of the
> special casing
> * [Preview Repair]
> org.apache.cassandra.db.streaming.CassandraStreamManager#createOutgoingStreams
> filters matches based off
> org.apache.cassandra.streaming.PreviewKind#predicate
> * [Preview Repair] skip streaming the files, only stream the metadata
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]