cmccabe commented on code in PR #12513:
URL: https://github.com/apache/kafka/pull/12513#discussion_r952882843
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1118,6 +1059,65 @@ private void updateWriteOffset(long offset) {
}
}
+ private void claim(int epoch) {
+ try {
+ if (curClaimEpoch != -1) {
+ throw new RuntimeException("Cannot claim leadership because we
are already the " +
+ "active controller.");
+ }
+ curClaimEpoch = epoch;
+ controllerMetrics.setActive(true);
+ updateWriteOffset(lastCommittedOffset);
+ clusterControl.activate();
+
+ // Before switching to active, create an in-memory snapshot at the
last committed
+ // offset. This is required because the active controller assumes
that there is always
+ // an in-memory snapshot at the last committed offset.
+ snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+ // Prepend the activate event. It is important that this event go
at the beginning
+ // of the queue rather than the end (hence prepend rather than
append). It's also
+ // important not to use prepend for anything else, to preserve the
ordering here.
+ queue.prepend(new ControllerWriteEvent<>("completeActivation[" +
epoch + "]",
+ new CompleteActivationEvent()));
+ } catch (Throwable e) {
+ fatalFaultHandler.handleFault("exception while claiming
leadership", e);
+ }
+ }
+
+ class CompleteActivationEvent implements ControllerWriteOperation<Void> {
Review Comment:
> How about BecomeActiveControllerEvent?
One issue is that this event doesn't do everything related to activation. It
just completes activation (finishes up some things)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]