This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 3767ea6 KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once (#8973) 3767ea6 is described below commit 3767ea6eed948acffb4f8693c7cf0a192ba44720 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Mon Sep 28 19:30:01 2020 -0400 KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once (#8973) Add logic to reset the existing `canReadConfigs` in `DistributedHerder` once the herder is able to successfully read the configs again. Added unit test to verify the functionality. Author: Chris Egerton <chr...@confluent.io> Reviewer: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rha...@gmail.com> --- .../connect/runtime/distributed/DistributedHerder.java | 13 ++++++++++--- .../connect/runtime/distributed/DistributedHerderTest.java | 9 +++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 86e9e81..e59025d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -316,8 +316,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { try { // if we failed to read to end of log before, we need to make sure the issue was resolved before joining group // Joining and immediately leaving for failure to read configs is exceedingly impolite - if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs)) - return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us + if (!canReadConfigs) { + if (readConfigToEnd(workerSyncTimeoutMs)) { + canReadConfigs = true; + } else { + return; // Safe to return and tick immediately because readConfigToEnd will do the backoff for us + } + } log.debug("Ensuring group membership is still active"); member.ensureActive(); @@ -1105,7 +1110,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // we timed out. This should only happen if we failed to read configuration for long enough, // in which case giving back control to the main loop will prevent hanging around indefinitely after getting kicked out of the group. // We also indicate to the main loop that we failed to readConfigs so it will check that the issue was resolved before trying to join the group - if (!readConfigToEnd(workerSyncTimeoutMs)) { + if (readConfigToEnd(workerSyncTimeoutMs)) { + canReadConfigs = true; + } else { canReadConfigs = false; needsRejoin = true; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 570f398..8cf485a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1668,6 +1668,11 @@ public class DistributedHerderTest { member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); + // one more tick, to make sure we don't keep trying to read to the config topic unnecessarily + expectRebalance(1, Collections.emptyList(), Collections.emptyList()); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + PowerMock.replayAll(); long before = time.milliseconds(); @@ -1685,6 +1690,10 @@ public class DistributedHerderTest { time.sleep(2000L); assertStatistics("leaderUrl", false, 3, 1, 100, 2000L); + // tick once more to ensure that the successful read to the end of the config topic was + // tracked and no further unnecessary attempts were made + herder.tick(); + PowerMock.verifyAll(); }