krishvishal commented on code in PR #2493:
URL: https://github.com/apache/iggy/pull/2493#discussion_r2633833904
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -170,9 +214,62 @@ where
header.op <= self.consensus.commit() ||
self.journal.has_prepare(header)
}
- fn replicate(&self, _prepare: Message<PrepareHeader>) {
- // TODO Forward prepare to next replica in chain.
- todo!()
+ /// Replicate a prepare message to the next replica in the chain.
+ ///
+ /// Chain replication pattern:
+ /// - Primary sends to first backup
+ /// - Each backup forwards to the next
+ /// - Stops when we would forward back to primary
+ ///
+ /// Direction alternates based on op number:
+ /// - Even ops: clockwise (+1)
+ /// - Odd ops: counter-clockwise (-1)
+ async fn replicate(&self, message: Message<PrepareHeader>) {
+ let header = message.header();
+
+ assert_eq!(header.command, Command2::Prepare);
+
+ // Preconditions - caller must ensure these
+ assert!(
+ !self.journal.has_prepare(header),
+ "replicate: must not already have prepare"
+ );
+
+ let direction = self.consensus.replicate_direction(header.op);
+
+ // Calculate next replica in the ring
+ let next: u8 = if direction > 0 {
+ // Clockwise
+ (self.consensus.replica() + 1) % self.consensus.replica_count()
+ } else {
+ // Counter-clockwise
+ if self.consensus.replica() == 0 {
+ self.consensus.replica_count() - 1
+ } else {
+ self.consensus.replica() - 1
+ }
+ };
+
+ let primary = self.consensus.primary_index(header.view);
+ if next == primary {
+ debug!(
+ replica = self.consensus.replica(),
+ op = header.op,
+ "replicate: not replicating (ring complete)"
+ );
+ return;
+ }
+
+ // TODO: are we doing standbys?
Review Comment:
Done.
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -170,9 +214,62 @@ where
header.op <= self.consensus.commit() ||
self.journal.has_prepare(header)
}
- fn replicate(&self, _prepare: Message<PrepareHeader>) {
- // TODO Forward prepare to next replica in chain.
- todo!()
+ /// Replicate a prepare message to the next replica in the chain.
+ ///
+ /// Chain replication pattern:
+ /// - Primary sends to first backup
+ /// - Each backup forwards to the next
+ /// - Stops when we would forward back to primary
+ ///
+ /// Direction alternates based on op number:
+ /// - Even ops: clockwise (+1)
+ /// - Odd ops: counter-clockwise (-1)
+ async fn replicate(&self, message: Message<PrepareHeader>) {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]