This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new de64b6c KAFKA-12252 and KAFKA-12262: Fix session key rotation when
leadership changes (#10014)
de64b6c is described below
commit de64b6ce576ef5b0b339d5199d8fb13ac1f21e2b
Author: Chris Egerton <[email protected]>
AuthorDate: Wed May 5 17:11:15 2021 -0400
KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership
changes (#10014)
Author: Chris Egerton <[email protected]>
Reviewers: Greg Harris <[email protected]>, Randall Hauch
<[email protected]>
---
checkstyle/suppressions.xml | 3 +
.../runtime/distributed/DistributedHerder.java | 9 +-
.../runtime/distributed/DistributedHerderTest.java | 105 ++++++++++++++++++++-
3 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 13dc59d..db760cb 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -150,6 +150,9 @@
<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask)Test.java"/>
+ <suppress checks="JavaNCSS"
+ files="DistributedHerderTest.java"/>
+
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index c312e05..cb2c4da 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -408,7 +408,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
log.debug("Scheduled rebalance at: {} (now: {}
nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
- if (internalRequestValidationEnabled() && keyExpiration <
Long.MAX_VALUE) {
+ if (isLeader() && internalRequestValidationEnabled() && keyExpiration
< Long.MAX_VALUE) {
nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs,
Math.max(keyExpiration - now, 0));
log.debug("Scheduled next key rotation at: {} (now: {}
nextRequestTimeoutMs: {}) ",
keyExpiration, now, nextRequestTimeoutMs);
@@ -1583,10 +1583,11 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
synchronized (DistributedHerder.this) {
DistributedHerder.this.sessionKey = sessionKey.key();
- // Track the expiration of the key if and only if this worker
is the leader
+ // Track the expiration of the key.
// Followers will receive rotated keys from the leader and
won't be responsible for
- // tracking expiration and distributing new keys themselves
- if (isLeader() && keyRotationIntervalMs > 0) {
+ // tracking expiration and distributing new keys themselves,
but may become leaders
+ // later on and will need to know when to update the key.
+ if (keyRotationIntervalMs > 0) {
DistributedHerder.this.keyExpiration =
sessionKey.creationTimestamp() + keyRotationIntervalMs;
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 65ec89c..1843868 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -2076,6 +2076,84 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ // First rebalance: poll indefinitely as no key has been read yet, so
expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+ SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class),
0);
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(2,
initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // Second rebalance: poll indefinitely as worker is follower, so
expiration still doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList(),
"member", MEMBER_URL);
+ Capture<SessionKey> updatedKey = EasyMock.newCapture();
+ configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+ return null;
+ });
+ // Third rebalance: poll for a limited time as worker has become
leader and must wake up for key expiration
+ Capture<Long> pollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(pollTimeout));
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ herder.tick();
+
+ assertTrue(pollTimeout.getValue() <=
DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws
Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(),
"member", MEMBER_URL);
+ SecretKey initialSecretKey = EasyMock.mock(SecretKey.class);
+
EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
+ EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new
byte[32]).anyTimes();
+ SessionKey initialKey = new SessionKey(initialSecretKey,
time.milliseconds());
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(1,
initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // First rebalance: poll for a limited time as worker is leader and
must wake up for key expiration
+ Capture<Long> firstPollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(firstPollTimeout));
+ EasyMock.expectLastCall();
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ // Second rebalance: poll indefinitely as worker is no longer leader,
so key expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll(initialSecretKey);
+
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ assertTrue(firstPollTimeout.getValue() <=
DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
@Test
public void testPutTaskConfigsSignatureNotRequiredV0() {
@@ -2311,6 +2389,14 @@ public class DistributedHerderTest {
ConnectProtocol.Assignment.NO_ERROR, offset,
assignedConnectors, assignedTasks, 0);
}
+ private void expectRebalance(final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ String leader, String leaderUrl) {
+ expectRebalance(Collections.emptyList(), Collections.emptyList(),
+ ConnectProtocol.Assignment.NO_ERROR, offset, leader,
leaderUrl, assignedConnectors, assignedTasks, 0);
+ }
+
// Handles common initial part of rebalance callback. Does not handle
instantiation of connectors and tasks.
private void expectRebalance(final Collection<String> revokedConnectors,
final List<ConnectorTaskId> revokedTasks,
@@ -2329,21 +2415,34 @@ public class DistributedHerderTest {
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks,
int delay) {
+ expectRebalance(revokedConnectors, revokedTasks, error, offset,
"leader", "leaderUrl", assignedConnectors, assignedTasks, delay);
+ }
+
+ // Handles common initial part of rebalance callback. Does not handle
instantiation of connectors and tasks.
+ private void expectRebalance(final Collection<String> revokedConnectors,
+ final List<ConnectorTaskId> revokedTasks,
+ final short error,
+ final long offset,
+ String leader,
+ String leaderUrl,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ int delay) {
member.ensureActive();
PowerMock.expectLastCall().andAnswer(() -> {
ExtendedAssignment assignment;
if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
- rebalanceListener.onRevoked("leader", revokedConnectors,
revokedTasks);
+ rebalanceListener.onRevoked(leader, revokedConnectors,
revokedTasks);
}
if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader", "leaderUrl",
offset,
+ connectProtocolVersion, error, leader, leaderUrl,
offset,
assignedConnectors, assignedTasks,
Collections.emptyList(), Collections.emptyList(), 0);
} else {
assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader", "leaderUrl",
offset,
+ connectProtocolVersion, error, leader, leaderUrl,
offset,
assignedConnectors, assignedTasks,
new ArrayList<>(revokedConnectors), new
ArrayList<>(revokedTasks), delay);
}