This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 52ab0ad KAFKA-10218: Stop reading config topic in every subsequent
tick if catchup fails once (#8973)
52ab0ad is described below
commit 52ab0ad6cf9d2a0a4ff3b59b97950dc226e95d8a
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Sep 28 19:30:01 2020 -0400
KAFKA-10218: Stop reading config topic in every subsequent tick if catchup
fails once (#8973)
Add logic to reset the existing `canReadConfigs` in `DistributedHerder`
once the herder is able to successfully read the configs again. Added unit test
to verify the functionality.
Author: Chris Egerton <[email protected]>
Reviewer: Nigel Liang <[email protected]>, Randall Hauch
<[email protected]>
---
.../connect/runtime/distributed/DistributedHerder.java | 13 ++++++++++---
.../connect/runtime/distributed/DistributedHerderTest.java | 9 +++++++++
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 91139c2..44e98f2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -306,8 +306,13 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
try {
// if we failed to read to end of log before, we need to make sure
the issue was resolved before joining group
// Joining and immediately leaving for failure to read configs is
exceedingly impolite
- if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
- return; // Safe to return and tick immediately because
readConfigToEnd will do the backoff for us
+ if (!canReadConfigs) {
+ if (readConfigToEnd(workerSyncTimeoutMs)) {
+ canReadConfigs = true;
+ } else {
+ return; // Safe to return and tick immediately because
readConfigToEnd will do the backoff for us
+ }
+ }
member.ensureActive();
// Ensure we're in a good state in our group. If not restart and
everything should be setup to rejoin
@@ -1033,7 +1038,9 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// we timed out. This should only happen if we failed to read
configuration for long enough,
// in which case giving back control to the main loop will prevent
hanging around indefinitely after getting kicked out of the group.
// We also indicate to the main loop that we failed to readConfigs
so it will check that the issue was resolved before trying to join the group
- if (!readConfigToEnd(workerSyncTimeoutMs)) {
+ if (readConfigToEnd(workerSyncTimeoutMs)) {
+ canReadConfigs = true;
+ } else {
canReadConfigs = false;
needsRejoin = true;
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 81208d3..a21788a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1471,6 +1471,11 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
+ // one more tick, to make sure we don't keep trying to read to the
config topic unnecessarily
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
PowerMock.replayAll();
long before = time.milliseconds();
@@ -1488,6 +1493,10 @@ public class DistributedHerderTest {
time.sleep(2000L);
assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
+ // tick once more to ensure that the successful read to the end of the
config topic was
+ // tracked and no further unnecessary attempts were made
+ herder.tick();
+
PowerMock.verifyAll();
}