This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 5105e6a  KAFKA-7194; Fix buffer underflow if onJoinComplete is retried 
after failure (#5417)
5105e6a is described below

commit 5105e6a886ca5ab953dbe505c4ce611562a06f7a
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Jul 24 01:25:18 2018 -0700

    KAFKA-7194; Fix buffer underflow if onJoinComplete is retried after failure 
(#5417)
    
    An untimely wakeup can cause ConsumerCoordinator.onJoinComplete to throw a 
WakeupException before completion. On the next poll(), it will be retried, but 
this leads to an underflow error because the buffer containing the assignment 
data will already have been advanced. The solution is to duplicate the buffer 
passed to onJoinComplete.
    
    Reviewers: Konstantine Karantasis <[email protected]>, Rajini 
Sivaram <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    |  9 ++--
 .../consumer/internals/ConsumerCoordinator.java    |  6 +--
 .../internals/ConsumerCoordinatorTest.java         | 55 +++++++++++++++++++++-
 3 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b5c7a66..53834fb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -200,9 +200,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                                                  Map<String, 
ByteBuffer> allMemberMetadata);
 
     /**
-     * Invoked when a group member has successfully joined a group. If this 
call is woken up (i.e.
-     * if the invocation raises {@link 
org.apache.kafka.common.errors.WakeupException}), then it
-     * will be retried on the next call to {@link #ensureActiveGroup()}.
+     * Invoked when a group member has successfully joined a group. If this 
call fails with an exception,
+     * then it will be retried using the same assignment state on the next 
call to {@link #ensureActiveGroup()}.
      *
      * @param generation The generation that was joined
      * @param memberId The identifier for the local member in the group
@@ -418,7 +417,9 @@ public abstract class AbstractCoordinator implements 
Closeable {
             }
 
             if (future.succeeded()) {
-                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, future.value());
+                // Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
+                ByteBuffer memberAssignment = future.value().duplicate();
+                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, memberAssignment);
 
                 // We reset the join group future only after the completion 
callback returns. This ensures
                 // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 23fd88d..e04cdeb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -264,10 +264,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             this.joinedSubscription = newJoinedSubscription;
         }
 
-        // update the metadata and enforce a refresh to make sure the fetcher 
can start
-        // fetching data in the next iteration
+        // Update the metadata to include the full group subscription. The 
leader will trigger a rebalance
+        // if there are any metadata changes affecting any of the consumed 
partitions (whether or not this
+        // instance is subscribed to the topics).
         this.metadata.setTopics(subscriptions.groupSubscription());
-        if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new 
TimeoutException();
 
         // give the assignor a chance to update internal state based on the 
received assignment
         assignor.onAssignment(assignment);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7c2638c..ba392c6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -875,6 +875,57 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testWakeupFromAssignmentCallback() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
+
+        final String topic = "topic1";
+        TopicPartition partition = new TopicPartition(topic, 0);
+        final String consumerId = "follower";
+        Set<String> topics = Collections.singleton(topic);
+        MockRebalanceListener rebalanceListener = new MockRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                boolean raiseWakeup = this.assignedCount == 0;
+                super.onPartitionsAssigned(partitions);
+
+                if (raiseWakeup)
+                    throw new WakeupException();
+            }
+        };
+
+        subscriptions.subscribe(topics, rebalanceListener);
+        metadata.setTopics(topics);
+
+        // we only have metadata for one topic initially
+        metadata.update(TestUtils.singletonCluster(topic, 1), 
Collections.emptySet(), time.milliseconds());
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(Long.MAX_VALUE);
+
+        // prepare initial rebalance
+        partitionAssignor.prepare(singletonMap(consumerId, 
Collections.singletonList(partition)));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(Collections.singletonList(partition), 
Errors.NONE));
+
+
+        // The first call to poll should raise the exception from the 
rebalance listener
+        try {
+            coordinator.poll(Long.MAX_VALUE);
+            fail("Expected exception thrown from assignment callback");
+        } catch (WakeupException e) {
+        }
+
+        // The second call should retry the assignment callback and succeed
+        coordinator.poll(Long.MAX_VALUE);
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(2, rebalanceListener.assignedCount);
+    }
+
+    @Test
     public void testRebalanceAfterTopicUnavailableWithSubscribe() {
         unavailableTopicTest(false, false, Collections.<String>emptySet());
     }
@@ -1901,7 +1952,7 @@ public class ConsumerCoordinatorTest {
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, 
String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, 
partitionAssignor.name(), memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap());
+                Collections.emptyMap());
     }
 
     private SyncGroupResponse syncGroupResponse(List<TopicPartition> 
partitions, Errors error) {
@@ -1914,7 +1965,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError, 
Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap());
+        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors 
partitionLevelError, String metadata, long offset) {

Reply via email to