[ https://issues.apache.org/jira/browse/CASSANDRA-16909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412879#comment-17412879 ]
David Capwell commented on CASSANDRA-16909: ------------------------------------------- Preview and IR updated > ☂ Medium Term Repair Improvements > --------------------------------- > > Key: CASSANDRA-16909 > URL: https://issues.apache.org/jira/browse/CASSANDRA-16909 > Project: Cassandra > Issue Type: Epic > Components: Consistency/Repair > Reporter: David Capwell > Assignee: David Capwell > Priority: Normal > > This is to track the repair improvement works defined on the dev list. > Email was: [DISCUSS] Repair Improvement Proposal > This JIRA will track the related tasks and be used for documentaiton > {code} > Repair Coordinator State > 1. ActiveRepairService.recordRepairStatus > 1. Maps JMX command (int) to (status, msg) > 2. Get/validate ColumnFamilies > 1. Skip if no ColumnFamilies to repair > 3. Get replicas for ranges > 1. Ignore ranges not covered by this instance IFF > --ignore-unreplicated-keyspaces, else Fail > 2. Skip if no ranges to repair > 3. If --force filter out non-alive (FailureDetector.isAlive) participants > 4. [Not PREVIEW]Update SystemDistributedKeyspace's parent_repair_history > 5. ActiveRepairService.prepareForRepair > 1. TODO Should this be under PREPARE or still part of validation? > 1. If CompactionsPendingThreshold triggered, Fail > 2. registerParentRepairSession - update map of UUID -> > ParentRepairSession > 2. Send PREPARE_MSG awaiting responses [possible failures: timeout > waiting, participate failure] > 1. [improvement] PREPARE_MSG should be idempotent and if no reply > within X, retry Y times > 2. Known Failures; all are retryable at this stage > 1. Timeout > 2. Participate goes from alive to down > 3. CompactionsPendingThreshold triggered > 1. Not included in > org.apache.cassandra.exceptions.RequestFailureReason#forException, so looks > the same as Unexpected error > 1. If updated and in mixed-mode, the update falls back to > UNKNOWN, which then matches the unexpected error behavior > 4. Unexpected error (this removes the session) > 6. Run RepairTask (normal, IR, preview); see coordinator state for each type > 7. On Success > 1. Update SystemDistributedKeyspace.parent_repair_history to show the > successful ranges > 2. If any sub-session failed, fail the job > 3. ActiveRepairService.cleanUp - send CLEANUP_MSG to all participates to > clean up > 1. TODO: why is this only on success and not failures as well? > 2. [improvement] - does not wait for ACK (though it is sent), we log > it async if we get it (else timeout) > 8. On Exception > 1. fail > Normal/Preview Repair Coordinator State > 1. For each common range > 1. ActiveRepairService.submitRepairSession > 1. Creates/run a RepairSession for each CommonRange > 2. Once all RepairSessions done > 1. [not consistent cross each type] handle session errors > 2. [Preview Repair] check SyncStatSummary for difference and send a > human readable notification > Incremental Repair Coordinator State > 1. org.apache.cassandra.repair.consistent.CoordinatorSessions#registerSession > to create CoordinatorSession > 2. CoordinatorSession.execute > 1. Send PREPARE_CONSISTENT_REQ and wait for PREPARE_CONSISTENT_RSP > 2. Await all success > 3. Trigger Normal Repair (see "Normal/Preview Repair Coordinator State") > 4. Send FINALIZE_PROPOSE_MSG and wait for FINALIZE_PROMISE_MSG > 5. Await all success > 6. Send FINALIZE_COMMIT_MSG > 1. [bug][improvement] No ACK is done, so if this message is dropped > then some participates will think the IR is still running (which can fail > preview repairs) > RepairSession > 1. [Preview Repair - kind=REPAIRED] register with LocalSessions for IR state > changes > 2. RepairSession.start > 1. [Not Preview Repair] Registering the session into > SystemDistributedKeyspace's table repair_history > 2. If endpoints is empty > 1. [UNHANDLED - downstream logic does not handle this case] Set > future with empty state (which is later seen as Failed... but not a failed > future) > 2. [Not Preview Repair] Mark session failed in repair_history > 3. Check all endpoints, if any is down and hasSkippedReplicas=false, Fail > the session > 4. For each table > 1. Create a RepairJob > 2. Execute job in RepairTask's executor > 3. await all jobs > 1. If all success > 1. Set session result to include the job results > 2. If any fail > 1. Fail the session future > 2. [Question] why does this NOT update repair_history like > other failures? > RepairJob > 1. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] send > SNAPSHOT_MSG to all participates > 1. [improvement] SNAPSHOT_MSG should be idempotent so coordinator can > retry if no ACK. This calls > org.apache.cassandra.repair.TableRepairManager#snapshot with the > RepairRunnable ID; this makes sure to snapshot once unless force=true > (RepairOptions !(dataCenters.isEmpty and hosts.isEmpty())) > 2. [improvement] This task is short lived, so rather than running in a > cached pool (which may allocate a thread), inline or add a notation of > cooperative sharing if retries are implemented > 2. Await snapshot success > 3. Send VALIDATION_REQ to all participates (all at once, or batched based off > parallelism) > 1. [bug] MerkleTree may be large, even if off-heap; this can cause the > coordinator to OOM (heap or direct); there is no bounds to the number of > MerkleTree which may be in-flight > 2. [improvement] VALIDATION_REQ could be made idempotent. Right now we > create a Validator and submit to ValidationManager, but could dedupe based > off session id > 4. Await validation success > 5. Stream any-all conflicting ranges (2 modes: optimiseStreams && not > pullRepair = optimisedSyncing, else standardSyncing) > 1. Create a SyncTask (Local, Asymmetric, Symmetric) for each conflicting > range > 1. Local: create stream plan and use streaming > 2. Asymmetric/Symmetric: send SYNC_REQ > 1. [bug][improvement] No ACK is done, so if this message is > dropped streaming does not start on the participate > 2. [improvement] Both AsymmetricRemoteSyncTask, and > SymmetricRemoteSyncTask are the same class; they are copy/paste clones of > each other; the only difference is AsymmetricRemoteSyncTask creates the > SyncRequest with asymmetric=true > 3. [improvement] Can be idempotent (when remote); currently just > starts streaming right away, would need to dedup on the session > 6. Await streaming complete > 7. onSuccess > 1. [not preview repair] update repair_history, marking the session success > 2. Set the future as success > 8. onFailure > 1. Abort validation tasks > 2. [not preview repair] update repair_history, marking the session failed > 3. Set the future to a failure > Repair Participate State > 1. Receive PREPARE_MSG > 1. [improvement] PREPARE_MSG should be idempotent > 1. [current state] If parentRepairSessions contains the session, it > ignores the request and noops; but does NOT validate that the sessions match > 2. [current state] mostly idempotent, assuming > CompactionsPendingThreshold does not trigger > 2. [IR] Receive PREPARE_CONSISTENT_REQ > 1. Create LocalSession in-memory > 2. Persist LocalSession to system.repairs > 3. Create new Executor; 1 thread per table > 1. [improvement] the thread waits for > org.apache.cassandra.db.repair.PendingAntiCompaction.AcquisitionCallable#acquireTuple > to get called and rerun SSTables from Tracker > 2. [improvement] causes the active compactions (not Validations) to > get interrupted, so if its running for a long time and generated a ton of > garbage... it was done in waste > 3. [improvement] > org.apache.cassandra.db.ColumnFamilyStore#runWithCompactionsDisabled is on a > single table level but could be extended for multiple tables, this would > allow only blocking on 1 thread as acquireTuple is just mutating > org.apache.cassandra.db.lifecycle.Tracker > 4. Run PendingAntiCompaction > 1. Block/Interrupt compactions, to collect SSTables to AntiCompact > 2. Trigger > org.apache.cassandra.db.compaction.CompactionManager#submitPendingAntiCompaction > for each table > 3. Calls > org.apache.cassandra.db.compaction.CompactionManager#performAnticompaction > 1. Sets UNREPAIRED_SSTABLE for each SSTable > 2. Group SStables > 3. For each group, rewrite SSTable into 3 SSTables: full > (repaired), transient (repaired), other (unprepared; out of range) > 4. On Success > 1. Update system.repairs > 2. Notify listeners (aka RepairSession) > 3. Send PREPARE_CONSISTENT_RSP with body=true > 5. On Failure > 1. Send PREPARE_CONSISTENT_RSP with body=false > 2. Update system.repairs > 3. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] Receive > SNAPSHOT_MSG > 1. Create a table snapshot using the RepairRunnable ID. If > !RepairOption.isGlobal, then override the snapshot if present > 4. Receive VALIDATION_REQ > 1. Creates a Validator and submits to CompactionManager's > validationExecutor > 2. Core logic: org.apache.cassandra.repair.ValidationManager#doValidation > 3. Iterate over each partition/row, updating a MerkleTree > 4. When done, switch to the ANTI_ENTROPY stage > 5. If coordinator is remote > 1. Send a VALIDATION_RSP back with the MerkleTree (or null if failed) > 6. Else > 1. Switch to ANTI_ENTROPY again > 2. Attempt to move MerkleTree off-heap > 3. Forward message to ActiveRepairService.handleMessage > 5. [SyncTask in [AsymmetricRemoteSyncTask, SymmetricRemoteSyncTask]] Receive > SYNC_REQ > 1. Creates a stream plan and use streaming (StreamingRepairTask) > 6. [IR] Receive FINALIZE_PROPOSE_MSG > 1. Update system.repairs > 2. Force flush table > 7. [IR] Receive FINALIZE_COMMIT_MSG > 1. Update system.repairs > 8. Receive CLEANUP_MSG > 1. Removes in-memory state and disk snapshots > LocalSessions (IR) > * On start, load state from system.repairs; mark active repairs as failed and > sent FAILED_SESSION_MSG to coordinato > * [every 10m; controlled by -Dcassandra.repair_cleanup_interval_seconds] for > each LocalSession > * If not updated within 1D (controlled by > -Dcassandra.repair_fail_timeout_seconds), mark repair failed > * Else if not updated within 1D (controlled by > -Dcassandra.repair_delete_timeout_seconds) > * Delete in-memory state > * Delete system.repairs > * Else if not updated within 1H (controlled by > -Dcassandra.repair_status_check_timeout_seconds), request status update from > all participants (get ConsistentSession.State) > * Update system.repairs if any participants have state FINALIZED or > FAILED > * [bug][improvement] if Validation or Streaming is still running, > this just updates the state but does not terminate active tasks > * [improvement] if all participates are still running, we will check > again in 10m > Streaming > * Streaming has docs on how it works, so this section just shows some of the > special casing > * [Preview Repair] > org.apache.cassandra.db.streaming.CassandraStreamManager#createOutgoingStreams > filters matches based off > org.apache.cassandra.streaming.PreviewKind#predicate > * [Preview Repair] skip streaming the files, only stream the metadata > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org