This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 5105e6a KAFKA-7194; Fix buffer underflow if onJoinComplete is retried
after failure (#5417)
5105e6a is described below
commit 5105e6a886ca5ab953dbe505c4ce611562a06f7a
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Jul 24 01:25:18 2018 -0700
KAFKA-7194; Fix buffer underflow if onJoinComplete is retried after failure
(#5417)
An untimely wakeup can cause ConsumerCoordinator.onJoinComplete to throw a
WakeupException before completion. On the next poll(), it will be retried, but
this leads to an underflow error because the buffer containing the assignment
data will already have been advanced. The solution is to duplicate the buffer
passed to onJoinComplete.
Reviewers: Konstantine Karantasis <[email protected]>, Rajini
Sivaram <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 9 ++--
.../consumer/internals/ConsumerCoordinator.java | 6 +--
.../internals/ConsumerCoordinatorTest.java | 55 +++++++++++++++++++++-
3 files changed, 61 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b5c7a66..53834fb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -200,9 +200,8 @@ public abstract class AbstractCoordinator implements
Closeable {
Map<String,
ByteBuffer> allMemberMetadata);
/**
- * Invoked when a group member has successfully joined a group. If this
call is woken up (i.e.
- * if the invocation raises {@link
org.apache.kafka.common.errors.WakeupException}), then it
- * will be retried on the next call to {@link #ensureActiveGroup()}.
+ * Invoked when a group member has successfully joined a group. If this
call fails with an exception,
+ * then it will be retried using the same assignment state on the next
call to {@link #ensureActiveGroup()}.
*
* @param generation The generation that was joined
* @param memberId The identifier for the local member in the group
@@ -418,7 +417,9 @@ public abstract class AbstractCoordinator implements
Closeable {
}
if (future.succeeded()) {
- onJoinComplete(generation.generationId, generation.memberId,
generation.protocol, future.value());
+ // Duplicate the buffer in case `onJoinComplete` does not
complete and needs to be retried.
+ ByteBuffer memberAssignment = future.value().duplicate();
+ onJoinComplete(generation.generationId, generation.memberId,
generation.protocol, memberAssignment);
// We reset the join group future only after the completion
callback returns. This ensures
// that if the callback is woken up, we will retry it on the
next joinGroupIfNeeded.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 23fd88d..e04cdeb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -264,10 +264,10 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.joinedSubscription = newJoinedSubscription;
}
- // update the metadata and enforce a refresh to make sure the fetcher
can start
- // fetching data in the next iteration
+ // Update the metadata to include the full group subscription. The
leader will trigger a rebalance
+ // if there are any metadata changes affecting any of the consumed
partitions (whether or not this
+ // instance is subscribed to the topics).
this.metadata.setTopics(subscriptions.groupSubscription());
- if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new
TimeoutException();
// give the assignor a chance to update internal state based on the
received assignment
assignor.onAssignment(assignment);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7c2638c..ba392c6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -875,6 +875,57 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testWakeupFromAssignmentCallback() {
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(),
assignors,
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
+
+ final String topic = "topic1";
+ TopicPartition partition = new TopicPartition(topic, 0);
+ final String consumerId = "follower";
+ Set<String> topics = Collections.singleton(topic);
+ MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ boolean raiseWakeup = this.assignedCount == 0;
+ super.onPartitionsAssigned(partitions);
+
+ if (raiseWakeup)
+ throw new WakeupException();
+ }
+ };
+
+ subscriptions.subscribe(topics, rebalanceListener);
+ metadata.setTopics(topics);
+
+ // we only have metadata for one topic initially
+ metadata.update(TestUtils.singletonCluster(topic, 1),
Collections.emptySet(), time.milliseconds());
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+ // prepare initial rebalance
+ partitionAssignor.prepare(singletonMap(consumerId,
Collections.singletonList(partition)));
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId,
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(Collections.singletonList(partition),
Errors.NONE));
+
+
+ // The first call to poll should raise the exception from the
rebalance listener
+ try {
+ coordinator.poll(Long.MAX_VALUE);
+ fail("Expected exception thrown from assignment callback");
+ } catch (WakeupException e) {
+ }
+
+ // The second call should retry the assignment callback and succeed
+ coordinator.poll(Long.MAX_VALUE);
+
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(2, rebalanceListener.assignedCount);
+ }
+
+ @Test
public void testRebalanceAfterTopicUnavailableWithSubscribe() {
unavailableTopicTest(false, false, Collections.<String>emptySet());
}
@@ -1901,7 +1952,7 @@ public class ConsumerCoordinatorTest {
private JoinGroupResponse joinGroupFollowerResponse(int generationId,
String memberId, String leaderId, Errors error) {
return new JoinGroupResponse(error, generationId,
partitionAssignor.name(), memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap());
+ Collections.emptyMap());
}
private SyncGroupResponse syncGroupResponse(List<TopicPartition>
partitions, Errors error) {
@@ -1914,7 +1965,7 @@ public class ConsumerCoordinatorTest {
}
private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
- return new OffsetFetchResponse(topLevelError,
Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap());
+ return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
}
private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors
partitionLevelError, String metadata, long offset) {