This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new fe18043 KAFKA-12474: Handle failure to write new session keys
gracefully (#10396)
fe18043 is described below
commit fe1804370680b965a68fdd2978e2afa450daafe4
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Apr 1 13:26:01 2021 -0400
KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
If a distributed worker fails to write (or read back) a new session key
to/from the config topic, it dies. This fix softens the blow a bit by instead
restarting the herder tick loop anew and forcing a read to the end of the
config topic until the worker is able to successfully read to the end.
At this point, if the worker was able to successfully write a new session
key in its first attempt, it will have read that key back from the config topic
and will not write a new key during the next tick iteration. If it was not able
to write that key at all, it will try again to write a new key (if it is still
the leader).
Verified with new unit tests for both cases (failure to write, failure to
read back after write).
Author: Chris Egerton <[email protected]>
Reviewers: Greg Harris <[email protected]>, Randall Hauch
<[email protected]>
---
.../runtime/distributed/DistributedHerder.java | 20 ++++--
.../runtime/distributed/DistributedHerderTest.java | 82 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 5 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b6dab25..93f6141 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -364,10 +364,16 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
if (checkForKeyRotation(now)) {
log.debug("Distributing new session key");
keyExpiration = Long.MAX_VALUE;
- configBackingStore.putSessionKey(new SessionKey(
- keyGenerator.generateKey(),
- now
- ));
+ try {
+ configBackingStore.putSessionKey(new SessionKey(
+ keyGenerator.generateKey(),
+ now
+ ));
+ } catch (Exception e) {
+ log.info("Failed to write new session key to config topic;
forcing a read to the end of the config topic before possibly retrying");
+ canReadConfigs = false;
+ return;
+ }
}
// Process any external requests
@@ -1186,7 +1192,11 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
* @return true if successful, false if timed out
*/
private boolean readConfigToEnd(long timeoutMs) {
- log.info("Current config state offset {} is behind group assignment
{}, reading to end of config log", configState.offset(), assignment.offset());
+ if (configState.offset() < assignment.offset()) {
+ log.info("Current config state offset {} is behind group
assignment {}, reading to end of config log", configState.offset(),
assignment.offset());
+ } else {
+ log.info("Reading to end of config log; current config state
offset: {}", configState.offset());
+ }
try {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index bed46a3..7ededef 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,12 +23,14 @@ import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
@@ -69,6 +71,7 @@ import
org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+import javax.crypto.SecretKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -86,8 +89,10 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
@@ -2271,6 +2276,83 @@ public class DistributedHerderTest {
}
@Test
+ public void testFailedToWriteSessionKey() throws Exception {
+ // First tick -- after joining the group, we try to write a new
+ // session key to the config topic, and fail
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+ // Second tick -- we read to the end of the config topic first,
+ // then ensure we're still active in the group
+ // then try a second time to write a new session key,
+ // then finally begin polling for group activity
+ expectPostRebalanceCatchup(SNAPSHOT);
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
+ SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
+
EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
+ EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
+ SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
+ ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1,
sessionKey, Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.emptySet());
+
+ // First tick -- after joining the group, we try to write a new
session key to
+ // the config topic, and fail (in this case, we're trying to simulate
that we've
+ // actually written the key successfully, but haven't been able to
read it back
+ // from the config topic, so to the herder it looks the same as if
it'd just failed
+ // to write the key)
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ expectPostRebalanceCatchup(SNAPSHOT);
+ configBackingStore.putSessionKey(anyObject(SessionKey.class));
+ EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
+
+ // Second tick -- we read to the end of the config topic first, and
pick up
+ // the session key that we were able to write the last time,
+ // then ensure we're still active in the group
+ // then finally begin polling for group activity
+ // Importantly, we do not try to write a new session key this time
around
+ configBackingStore.refresh(EasyMock.anyLong(),
EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ configUpdateListener.onSessionKeyUpdate(sessionKey);
+ return null;
+ });
+
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey);
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll(secretKey);
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testKeyExceptionDetection() {
assertFalse(herder.isPossibleExpiredKeyException(
time.milliseconds(),