This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 3767ea6  KAFKA-10218: Stop reading config topic in every subsequent 
tick if catchup fails once (#8973)
3767ea6 is described below

commit 3767ea6eed948acffb4f8693c7cf0a192ba44720
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Mon Sep 28 19:30:01 2020 -0400

    KAFKA-10218: Stop reading config topic in every subsequent tick if catchup 
fails once (#8973)
    
    Add logic to reset the existing `canReadConfigs` in `DistributedHerder` 
once the herder is able to successfully read the configs again. Added unit test 
to verify the functionality.
    
    Author: Chris Egerton <chr...@confluent.io>
    Reviewer: Nigel Liang <ni...@nigelliang.com>, Randall Hauch 
<rha...@gmail.com>
---
 .../connect/runtime/distributed/DistributedHerder.java      | 13 ++++++++++---
 .../connect/runtime/distributed/DistributedHerderTest.java  |  9 +++++++++
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 86e9e81..e59025d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -316,8 +316,13 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         try {
             // if we failed to read to end of log before, we need to make sure 
the issue was resolved before joining group
             // Joining and immediately leaving for failure to read configs is 
exceedingly impolite
-            if (!canReadConfigs && !readConfigToEnd(workerSyncTimeoutMs))
-                return; // Safe to return and tick immediately because 
readConfigToEnd will do the backoff for us
+            if (!canReadConfigs) {
+                if (readConfigToEnd(workerSyncTimeoutMs)) {
+                    canReadConfigs = true;
+                } else {
+                    return; // Safe to return and tick immediately because 
readConfigToEnd will do the backoff for us
+                }
+            }
 
             log.debug("Ensuring group membership is still active");
             member.ensureActive();
@@ -1105,7 +1110,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             // we timed out. This should only happen if we failed to read 
configuration for long enough,
             // in which case giving back control to the main loop will prevent 
hanging around indefinitely after getting kicked out of the group.
             // We also indicate to the main loop that we failed to readConfigs 
so it will check that the issue was resolved before trying to join the group
-            if (!readConfigToEnd(workerSyncTimeoutMs)) {
+            if (readConfigToEnd(workerSyncTimeoutMs)) {
+                canReadConfigs = true;
+            } else {
                 canReadConfigs = false;
                 needsRejoin = true;
             }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 570f398..8cf485a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -1668,6 +1668,11 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
+        // one more tick, to make sure we don't keep trying to read to the 
config topic unnecessarily
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
         PowerMock.replayAll();
 
         long before = time.milliseconds();
@@ -1685,6 +1690,10 @@ public class DistributedHerderTest {
         time.sleep(2000L);
         assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
 
+        // tick once more to ensure that the successful read to the end of the 
config topic was 
+        // tracked and no further unnecessary attempts were made
+        herder.tick();
+
         PowerMock.verifyAll();
     }
 

Reply via email to