This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 74e6fe7 KAFKA-10134 Follow-up: Set the re-join flag in heartbeat
failure (#9354)
74e6fe7 is described below
commit 74e6fe715e35439f0c82f34eccdc45d2cd7e8227
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Oct 1 17:57:00 2020 -0700
KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354)
Reviewers: A. Sophie Blee-Goldman <[email protected]>, Boyang Chen
<[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 2 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 36 +++++++++++++---------
2 files changed, 23 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a565c21..db35500 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -942,7 +942,7 @@ public abstract class AbstractCoordinator implements
Closeable {
synchronized void resetGenerationOnResponseError(ApiKeys api, Errors
error) {
log.debug("Resetting generation after encountering {} from {} response
and requesting re-join", error, api);
- resetState();
+ resetStateAndRejoin();
}
synchronized void resetGenerationOnLeaveGroup() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8cfbfd6..c5a5128 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -116,6 +116,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -1828,7 +1829,7 @@ public class KafkaConsumerTest {
}
@Test
- public void testReturnRecordsDuringRebalance() {
+ public void testReturnRecordsDuringRebalance() throws InterruptedException
{
Time time = new MockTime(1L);
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
@@ -1843,15 +1844,13 @@ public class KafkaConsumerTest {
Node node = metadata.fetch().nodes().get(0);
Node coordinator = prepareRebalance(client, node, assignor,
Arrays.asList(tp0, t2p0), null);
- // a first poll with zero millisecond would not complete the rebalance
- consumer.poll(Duration.ZERO);
+ // a poll with non-zero milliseconds would complete three round-trips
(discover, join, sync)
+ TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(100L));
+ return consumer.assignment().equals(Utils.mkSet(tp0, t2p0));
+ }, "Does not complete rebalance in time");
assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
- assertEquals(Collections.emptySet(), consumer.assignment());
-
- // a second poll with non-zero milliseconds would complete three
round-trips (discover, join, sync)
- consumer.poll(Duration.ofMillis(100L));
-
assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
// prepare a response of the outstanding fetch so that we have data
available on the next poll
@@ -1904,7 +1903,6 @@ public class KafkaConsumerTest {
// mock rebalance responses
client.respondFrom(joinGroupFollowerResponse(assignor, 2, "memberId",
"leaderId", Errors.NONE), coordinator);
- client.prepareResponseFrom(syncGroupResponse(Arrays.asList(tp0, t3p0),
Errors.NONE), coordinator);
// we need to poll 1) for getting the join response, and then send the
sync request;
// 2) for getting the sync response
@@ -1920,12 +1918,19 @@ public class KafkaConsumerTest {
fetches1.put(tp0, new FetchInfo(3, 1));
client.respondFrom(fetchResponse(fetches1), node);
- records = consumer.poll(Duration.ZERO);
+ // now complete the rebalance
+ client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0),
Errors.NONE), coordinator);
+
+ AtomicInteger count = new AtomicInteger(0);
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<String, String> recs =
consumer.poll(Duration.ofMillis(100L));
+ return consumer.assignment().equals(Utils.mkSet(tp0, t3p0)) &&
count.addAndGet(recs.count()) == 1;
+
+ }, "Does not complete rebalance in time");
// should have t3 but not sent yet the t3 records
assertEquals(Utils.mkSet(topic, topic3), consumer.subscription());
assertEquals(Utils.mkSet(tp0, t3p0), consumer.assignment());
- assertEquals(1, records.count());
assertEquals(4L, consumer.position(tp0));
assertEquals(0L, consumer.position(t3p0));
@@ -1934,10 +1939,13 @@ public class KafkaConsumerTest {
fetches1.put(t3p0, new FetchInfo(0, 100));
client.respondFrom(fetchResponse(fetches1), node);
- records = consumer.poll(Duration.ZERO);
+ count.set(0);
+ TestUtils.waitForCondition(() -> {
+ ConsumerRecords<String, String> recs =
consumer.poll(Duration.ofMillis(100L));
+ return count.addAndGet(recs.count()) == 101;
+
+ }, "Does not complete rebalance in time");
- // should have t3 but not sent yet the t3 records
- assertEquals(101, records.count());
assertEquals(5L, consumer.position(tp0));
assertEquals(100L, consumer.position(t3p0));