cmccabe commented on code in PR #12513:
URL: https://github.com/apache/kafka/pull/12513#discussion_r950388163
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1118,6 +1059,65 @@ private void updateWriteOffset(long offset) {
}
}
+ private void claim(int epoch) {
+ try {
+ if (curClaimEpoch != -1) {
+ throw new RuntimeException("Cannot claim leadership because we
are already the " +
+ "active controller.");
+ }
+ curClaimEpoch = epoch;
+ controllerMetrics.setActive(true);
+ updateWriteOffset(lastCommittedOffset);
+ clusterControl.activate();
+
+ // Before switching to active, create an in-memory snapshot at the
last committed
+ // offset. This is required because the active controller assumes
that there is always
+ // an in-memory snapshot at the last committed offset.
+ snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+ // Prepend the activate event. It is important that this event go
at the beginning
+ // of the queue rather than the end (hence prepend rather than
append). It's also
+ // important not to use prepend for anything else, to preserve the
ordering here.
+ queue.prepend(new ControllerWriteEvent<>("completeActivation[" +
epoch + "]",
+ new CompleteActivationEvent()));
+ } catch (Throwable e) {
+ fatalFaultHandler.handleFault("exception while claiming
leadership", e);
+ }
+ }
+
+ class CompleteActivationEvent implements ControllerWriteOperation<Void> {
Review Comment:
Hmm... we do use the terminology of "active / activation" in a lot of other
places.
```
controllerMetrics.setActive
isActiveController
clusterControl.activate
```
it's even in a metric...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]