zentol commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1250941228
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -189,66 +179,62 @@ protected void register(String contenderID,
LeaderContender contender) throws Ex
checkNotNull(contender, "Contender must not be null.");
synchronized (lock) {
- Preconditions.checkState(
- leaderContender == null,
- "Only one LeaderContender is allowed to be registered to
this service.");
- Preconditions.checkState(
- this.contenderID == null, "The contenderID is only allowed
to be set once.");
Preconditions.checkState(
running,
"The DefaultLeaderElectionService should have established
a connection to the backend before it's started.");
- leaderContender = contender;
- this.contenderID = contenderID;
+ leaderContenderRegistry.put(contenderID, contender);
LOG.info(
"LeaderContender {} has been registered for {}.",
- contender.getDescription(),
+ contenderID,
leaderElectionDriver);
if (issuedLeaderSessionID != null) {
// notifying the LeaderContender shouldn't happen in the
contender's main thread
runInLeaderEventThread(
- () ->
notifyLeaderContenderOfLeadership(issuedLeaderSessionID));
+ () ->
+ notifyLeaderContenderOfLeadership(
+ contenderID, issuedLeaderSessionID));
}
}
}
@Override
protected final void remove(String contenderID) {
synchronized (lock) {
- if (this.contenderID == null) {
+ if (!leaderContenderRegistry.containsKey(contenderID)) {
LOG.debug(
"The stop procedure was called on an already stopped
DefaultLeaderElectionService instance. No action necessary.");
Review Comment:
update message to include the contender and clarify the the LES may still be
running?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -189,66 +179,62 @@ protected void register(String contenderID,
LeaderContender contender) throws Ex
checkNotNull(contender, "Contender must not be null.");
synchronized (lock) {
- Preconditions.checkState(
- leaderContender == null,
- "Only one LeaderContender is allowed to be registered to
this service.");
- Preconditions.checkState(
- this.contenderID == null, "The contenderID is only allowed
to be set once.");
Preconditions.checkState(
running,
"The DefaultLeaderElectionService should have established
a connection to the backend before it's started.");
- leaderContender = contender;
- this.contenderID = contenderID;
+ leaderContenderRegistry.put(contenderID, contender);
Review Comment:
check that the previous value was null?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -932,38 +1064,71 @@ void runTestWithManuallyTriggeredEvents(
void runTest(RunnableWithException testMethod, ExecutorService
leaderEventOperationExecutor)
throws Exception {
- try {
- leaderElectionService =
- new DefaultLeaderElectionService(
- driverFactory,
-
DefaultLeaderElectionServiceTest.this.fatalErrorHandlerExtension
- .getTestingFatalErrorHandler(),
- leaderEventOperationExecutor);
+ try (final DefaultLeaderElectionService localLeaderElectionService
=
+ new DefaultLeaderElectionService(
+ driverFactory,
+
DefaultLeaderElectionServiceTest.this.fatalErrorHandlerExtension
+ .getTestingFatalErrorHandler(),
+ leaderEventOperationExecutor)) {
+ leaderElectionService = localLeaderElectionService;
leaderElectionService.startLeaderElectionBackend();
testingLeaderElectionDriver =
driverFactory.assertAndGetOnlyCreatedDriver();
- leaderElection =
leaderElectionService.createLeaderElection(contenderID);
- testingContender = new TestingContender(TEST_URL,
leaderElection);
- testingContender.startLeaderElection();
+ try (final ContenderContext localContenderContext0 =
+ ContenderContext.create(0,
leaderElectionService);
+ final ContenderContext localContenderContext1 =
+ ContenderContext.create(1,
leaderElectionService)) {
+ this.contenderContext0 = localContenderContext0;
+ this.contenderContext1 = localContenderContext1;
Review Comment:
Why are we creating 2 contenders now? Purely to have all tests also apply to
cases with multiple contenders?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegisterTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LeaderInformationRegisterTest {
+
+ @Test
+ void testOfWithKnownLeaderInformation() {
+ final String contenderID = "contender-id";
+ final LeaderInformation leaderInformation =
+ LeaderInformation.known(UUID.randomUUID(), "address");
+
+ final LeaderInformationRegister testInstance =
+ LeaderInformationRegister.of(contenderID, leaderInformation);
+
assertThat(testInstance.getRegisteredContenderIDs()).containsExactly(contenderID);
+
assertThat(testInstance.forContenderID(contenderID)).hasValue(leaderInformation);
+ }
+
+ @Test
+ void testOfWithEmptyLeaderInformation() {
+ final String contenderID = "contender-id";
+ final LeaderInformationRegister testInstance =
+ LeaderInformationRegister.of(contenderID,
LeaderInformation.empty());
+
+ assertThat(testInstance.getRegisteredContenderIDs()).isEmpty();
+ assertThat(testInstance.forContenderID(contenderID)).isNotPresent();
+ }
+
+ @Test
+ void testMerge() {
+ final String contenderID = "contender-id";
+ final LeaderInformation leaderInformation =
+ LeaderInformation.known(UUID.randomUUID(), "address");
+
+ final String newContenderID = "new-contender-id";
+ final LeaderInformation newLeaderInformation =
+ LeaderInformation.known(UUID.randomUUID(), "new-address");
+
+ final LeaderInformationRegister initialRegister =
+ LeaderInformationRegister.of(contenderID, leaderInformation);
+
+ final LeaderInformationRegister newRegister =
+ LeaderInformationRegister.merge(
+ initialRegister, newContenderID, newLeaderInformation);
+
+ assertThat(newRegister).isNotSameAs(initialRegister);
+ assertThat(newRegister.getRegisteredContenderIDs())
+ .containsExactlyInAnyOrder(contenderID, newContenderID);
+
assertThat(newRegister.forContenderID(contenderID)).hasValue(leaderInformation);
+
assertThat(newRegister.forContenderID(newContenderID)).hasValue(newLeaderInformation);
+ }
+
+ @Test
+ void testMergeEmptyLeaderInformation() {
+ final String contenderID = "contender-id";
+ final LeaderInformation leaderInformation =
+ LeaderInformation.known(UUID.randomUUID(), "address");
+
+ final String newContenderID = "new-contender-id";
+
+ final LeaderInformationRegister initialRegister =
+ LeaderInformationRegister.of(contenderID, leaderInformation);
+
+ final LeaderInformationRegister newRegister =
+ LeaderInformationRegister.merge(
+ initialRegister, newContenderID,
LeaderInformation.empty());
+
+ assertThat(newRegister).isNotSameAs(initialRegister);
+
assertThat(newRegister.getRegisteredContenderIDs()).containsExactly(contenderID);
+ assertThat(newRegister.forContenderID(newContenderID)).isNotPresent();
+ }
+
+ @Test
+ void testMergeEmptyLeaderInformationForExistingContenderID() {
Review Comment:
for completeness a test where we overwrite existing leader information with
new (non-empty) leader information would be nice.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -503,43 +516,53 @@ private void runInLeaderEventThread(Runnable callback) {
private void forwardErrorToLeaderContender(Throwable t) {
synchronized (lock) {
- if (leaderContender == null) {
+ if (leaderContenderRegistry.isEmpty()) {
fallbackErrorHandler.onFatalError(t);
return;
}
- if (t instanceof LeaderElectionException) {
- leaderContender.handleError((LeaderElectionException) t);
- } else {
- leaderContender.handleError(new LeaderElectionException(t));
- }
+ leaderContenderRegistry
+ .values()
+ .forEach(
+ leaderContender -> {
+ if (leaderContender == null) {
Review Comment:
:thinking: When can the value of the registry map be null? Even if that were
possible (which I dont think it is), then we'd logging this message once per
contender.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -349,22 +380,42 @@ void
testProperCleanupOnLeaderElectionCloseWhenHoldingTheLeadership() throws Exc
final UUID leaderSessionID = UUID.randomUUID();
grantLeadership(leaderSessionID);
-
assertThat(testingContender.getLeaderSessionID()).isNotNull();
-
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
- .isEqualTo(leaderSessionID);
-
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
-
.hasValue(LeaderInformation.known(leaderSessionID, TEST_URL));
+ applyToBothContenderContexts(
+ ctx -> {
+
assertThat(ctx.contender.getLeaderSessionID())
+ .isEqualTo(leaderSessionID);
+ assertThat(
+
leaderElectionService.getLeaderSessionID(
+
ctx.contenderID))
+ .isEqualTo(leaderSessionID);
+
+ assertThat(
+
leaderElectionService.getLeaderSessionID(
+
ctx.contenderID))
+ .isEqualTo(leaderSessionID);
+
+ assertThat(
+ storedLeaderInformation
+ .get()
+
.forContenderID(ctx.contenderID))
+ .hasValue(
+
LeaderInformation.known(
+
leaderSessionID, ctx.address));
Review Comment:
look at that magnificent formatting magic, going from 5 lines 18.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
}
@GuardedBy("lock")
- private void notifyLeaderContenderOfLeadershipLoss() {
+ private void notifyLeaderContenderOfLeadershipLoss(
+ String contenderID, LeaderContender leaderContender) {
Preconditions.checkState(
leaderContender != null,
"The LeaderContender should be always set when calling this
method.");
- if (confirmedLeaderInformation.isEmpty()) {
+ if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
LOG.debug(
"Revoking leadership to contender {} while a previous
leadership grant wasn't confirmed, yet.",
- leaderContender.getDescription());
+ contenderID);
} else {
LOG.debug(
"Revoking leadership to contender {} for {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+ contenderID,
+ LeaderElectionUtils.convertToString(
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
}
- confirmedLeaderInformation = LeaderInformation.empty();
+ confirmedLeaderInformation =
+ LeaderInformationRegister.clear(confirmedLeaderInformation,
contenderID);
leaderContender.revokeLeadership();
}
@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation)
{
- runInLeaderEventThread(() ->
onLeaderInformationChangeInternal(leaderInformation));
+ leaderContenderRegistry
+ .keySet()
+ .forEach(
+ contenderID ->
+ notifyLeaderInformationChange(contenderID,
leaderInformation));
}
@GuardedBy("lock")
- private void onLeaderInformationChangeInternal(LeaderInformation
leaderInformation) {
- if (leaderContender != null) {
- LOG.trace(
- "Leader node changed while {} is the leader with {}. New
leader information {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
- LeaderElectionUtils.convertToString(leaderInformation));
- if (!confirmedLeaderInformation.isEmpty()) {
- final LeaderInformation confirmedLeaderInfo =
this.confirmedLeaderInformation;
- if (leaderInformation.isEmpty()) {
- LOG.debug(
- "Writing leader information by {} since the
external storage is empty.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
- // the data field does not correspond to the expected
leader information
- LOG.debug(
- "Correcting leader information by {}.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- }
- }
- } else {
- LOG.debug(
- "Ignoring change notification since the {} has already
been stopped.",
- leaderElectionDriver);
- }
+ private void onLeaderInformationChangeInternal(LeaderInformationRegister
leaderInformation) {
+ leaderContenderRegistry.forEach(
+ (contenderID, leaderContender) -> {
+ final LeaderInformation newLeaderInformationForContender =
+ leaderInformation
+ .forContenderID(contenderID)
+ .orElse(LeaderInformation.empty());
+ final LeaderInformation
confirmedLeaderInformationForContender =
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+ if (confirmedLeaderInformationForContender.isEmpty()) {
+ LOG.trace(
+ "Leader information changed while there's no
confirmation available by the contender for contender ID '{}' for this session.
Changed leader information {} will be ignored.",
+ contenderID,
+ LeaderElectionUtils.convertToString(
+ newLeaderInformationForContender));
Review Comment:
I have trouble understanding this situation. A contender got leadership, but
hasn't confirmed it yet? Shouldn't we then put the leadership loss on the wire
anyway? When is the contender informed about the leadership loss otherwise?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -349,15 +342,17 @@ protected boolean hasLeadership(String contenderID, UUID
leaderSessionId) {
@Nullable
public UUID getLeaderSessionID(String contenderID) {
synchronized (lock) {
- return contenderID.equals(this.contenderID)
- ? confirmedLeaderInformation.getLeaderSessionID()
+ return leaderContenderRegistry.containsKey(contenderID)
+ ? confirmedLeaderInformation
+ .forContenderIDOrEmpty(contenderID)
Review Comment:
This method names looks weird; maybe rename to `forContenderIdOrEmpty`?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -597,14 +676,30 @@ void
testOnLeaderInformationChangeIsIgnoredAfterLeaderElectionBeingStop() throws
() -> {
grantLeadership();
-
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
- .isPresent();
+ applyToBothContenderContexts(
+ ctx ->
+ assertThat(
+
storedLeaderInformation
+ .get()
+
.forContenderID(
+
ctx.contenderID))
+ .isPresent());
+
+ contenderContext0.leaderElection.close();
Review Comment:
why isn't this done within `applyToBothContenderContexts`? it runs the same
logic for both contenders, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]