This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 91d41e9 KAFKA-12252 and KAFKA-12262: Fix session key rotation when
leadership changes (#10014)
91d41e9 is described below
commit 91d41e948fc1d5317d056960b99570718b3ea2c6
Author: Chris Egerton <[email protected]>
AuthorDate: Wed May 5 17:11:15 2021 -0400
KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership
changes (#10014)
Author: Chris Egerton <[email protected]>
Reviewers: Greg Harris <[email protected]>, Randall Hauch
<[email protected]>
---
checkstyle/suppressions.xml | 3 +
.../runtime/distributed/DistributedHerder.java | 9 +-
.../runtime/distributed/DistributedHerderTest.java | 140 +++++++++++++++++----
3 files changed, 126 insertions(+), 26 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 97a96c8..1665414 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -142,6 +142,9 @@
<suppress checks="MethodLength"
files="(RequestResponse|WorkerSinkTask)Test.java"/>
+ <suppress checks="JavaNCSS"
+ files="DistributedHerderTest.java"/>
+
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 93f6141..df90c45 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -409,7 +409,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
log.debug("Scheduled rebalance at: {} (now: {}
nextRequestTimeoutMs: {}) ",
scheduledRebalance, now, nextRequestTimeoutMs);
}
- if (internalRequestValidationEnabled() && keyExpiration <
Long.MAX_VALUE) {
+ if (isLeader() && internalRequestValidationEnabled() && keyExpiration
< Long.MAX_VALUE) {
nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs,
Math.max(keyExpiration - now, 0));
log.debug("Scheduled next key rotation at: {} (now: {}
nextRequestTimeoutMs: {}) ",
keyExpiration, now, nextRequestTimeoutMs);
@@ -1620,10 +1620,11 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
synchronized (DistributedHerder.this) {
DistributedHerder.this.sessionKey = sessionKey.key();
- // Track the expiration of the key if and only if this worker
is the leader
+ // Track the expiration of the key.
// Followers will receive rotated keys from the leader and
won't be responsible for
- // tracking expiration and distributing new keys themselves
- if (isLeader() && keyRotationIntervalMs > 0) {
+ // tracking expiration and distributing new keys themselves,
but may become leaders
+ // later on and will need to know when to update the key.
+ if (keyRotationIntervalMs > 0) {
DistributedHerder.this.keyExpiration =
sessionKey.creationTimestamp() + keyRotationIntervalMs;
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7ededef..519cda8 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -2170,6 +2170,84 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ // First rebalance: poll indefinitely as no key has been read yet, so
expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList());
+ SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class),
0);
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(2,
initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // Second rebalance: poll indefinitely as worker is follower, so
expiration still doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ expectRebalance(2, Collections.emptyList(), Collections.emptyList(),
"member", MEMBER_URL);
+ Capture<SessionKey> updatedKey = EasyMock.newCapture();
+ configBackingStore.putSessionKey(EasyMock.capture(updatedKey));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(updatedKey.getValue());
+ return null;
+ });
+ // Third rebalance: poll for a limited time as worker has become
leader and must wake up for key expiration
+ Capture<Long> pollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(pollTimeout));
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ herder.tick();
+
+ assertTrue(pollTimeout.getValue() <=
DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws
Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(),
"member", MEMBER_URL);
+ SecretKey initialSecretKey = EasyMock.mock(SecretKey.class);
+
EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
+ EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new
byte[32]).anyTimes();
+ SessionKey initialKey = new SessionKey(initialSecretKey,
time.milliseconds());
+ ClusterConfigState snapshotWithKey = new ClusterConfigState(1,
initialKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ expectPostRebalanceCatchup(snapshotWithKey);
+ // First rebalance: poll for a limited time as worker is leader and
must wake up for key expiration
+ Capture<Long> firstPollTimeout = EasyMock.newCapture();
+ member.poll(EasyMock.captureLong(firstPollTimeout));
+ EasyMock.expectLastCall();
+
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ // Second rebalance: poll indefinitely as worker is no longer leader,
so key expiration doesn't come into play
+ member.poll(Long.MAX_VALUE);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll(initialSecretKey);
+
+ configUpdateListener.onSessionKeyUpdate(initialKey);
+ herder.tick();
+ assertTrue(firstPollTimeout.getValue() <=
DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT);
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
@Test
public void testPutTaskConfigsSignatureNotRequiredV0() {
@@ -2405,6 +2483,14 @@ public class DistributedHerderTest {
ConnectProtocol.Assignment.NO_ERROR, offset,
assignedConnectors, assignedTasks, 0);
}
+ private void expectRebalance(final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ String leader, String leaderUrl) {
+ expectRebalance(Collections.emptyList(), Collections.emptyList(),
+ ConnectProtocol.Assignment.NO_ERROR, offset, leader,
leaderUrl, assignedConnectors, assignedTasks, 0);
+ }
+
// Handles common initial part of rebalance callback. Does not handle
instantiation of connectors and tasks.
private void expectRebalance(final Collection<String> revokedConnectors,
final List<ConnectorTaskId> revokedTasks,
@@ -2423,30 +2509,40 @@ public class DistributedHerderTest {
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks,
int delay) {
+ expectRebalance(revokedConnectors, revokedTasks, error, offset,
"leader", "leaderUrl", assignedConnectors, assignedTasks, delay);
+ }
+
+ // Handles common initial part of rebalance callback. Does not handle
instantiation of connectors and tasks.
+ private void expectRebalance(final Collection<String> revokedConnectors,
+ final List<ConnectorTaskId> revokedTasks,
+ final short error,
+ final long offset,
+ String leader,
+ String leaderUrl,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks,
+ int delay) {
member.ensureActive();
- PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- ExtendedAssignment assignment;
- if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
- rebalanceListener.onRevoked("leader", revokedConnectors,
revokedTasks);
- }
-
- if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
- assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader",
"leaderUrl", offset,
- assignedConnectors, assignedTasks,
- Collections.emptyList(), Collections.emptyList(),
0);
- } else {
- assignment = new ExtendedAssignment(
- connectProtocolVersion, error, "leader",
"leaderUrl", offset,
- new ArrayList<>(assignedConnectors), new
ArrayList<>(assignedTasks),
- new ArrayList<>(revokedConnectors), new
ArrayList<>(revokedTasks), delay);
- }
- rebalanceListener.onAssigned(assignment, 3);
- time.sleep(100L);
- return null;
+ PowerMock.expectLastCall().andAnswer(() -> {
+ ExtendedAssignment assignment;
+ if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
+ rebalanceListener.onRevoked(leader, revokedConnectors,
revokedTasks);
+ }
+
+ if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
+ assignment = new ExtendedAssignment(
+ connectProtocolVersion, error, leader, leaderUrl,
offset,
+ assignedConnectors, assignedTasks,
+ Collections.emptyList(), Collections.emptyList(), 0);
+ } else {
+ assignment = new ExtendedAssignment(
+ connectProtocolVersion, error, leader, leaderUrl,
offset,
+ new ArrayList<>(assignedConnectors), new
ArrayList<>(assignedTasks),
+ new ArrayList<>(revokedConnectors), new
ArrayList<>(revokedTasks), delay);
}
+ rebalanceListener.onAssigned(assignment, 3);
+ time.sleep(100L);
+ return null;
});
if (!revokedConnectors.isEmpty()) {