This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new c8d2983  KAFKA-13406: skip assignment validation for built-in 
cooperativeStickyAssignor (#11439)
c8d2983 is described below

commit c8d2983876ea0d41880534016fa545810f9fd7dd
Author: Luke Chen <[email protected]>
AuthorDate: Tue Nov 16 10:57:03 2021 +0800

    KAFKA-13406: skip assignment validation for built-in 
cooperativeStickyAssignor (#11439)
    
    This fix is trying to skip the assignment validation for built-in 
cooperative sticky assignor, since (a) we know the assignment is valid since we 
do essentially this same check already in the cooperative sticky assignor, and 
(b) the check is broken anyways due to potential for claimed `ownedPartitions` 
to be incorrect
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../consumer/CooperativeStickyAssignor.java        |   3 +-
 .../consumer/internals/ConsumerCoordinator.java    |  11 +-
 .../internals/ConsumerCoordinatorTest.java         | 139 ++++++++++++++++++---
 3 files changed, 135 insertions(+), 18 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
index b2af7a0..abf664b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.protocol.types.Type;
  * cooperative rebalancing. See the <a 
href="https://kafka.apache.org/documentation/#upgrade_240_notable";>upgrade 
guide</a> for details.
  */
 public class CooperativeStickyAssignor extends AbstractStickyAssignor {
+    public static final String COOPERATIVE_STICKY_ASSIGNOR_NAME = 
"cooperative-sticky";
 
     // these schemas are used for preserving useful metadata for the 
assignment, such as the last stable generation
     private static final String GENERATION_KEY_NAME = "generation";
@@ -59,7 +60,7 @@ public class CooperativeStickyAssignor extends 
AbstractStickyAssignor {
 
     @Override
     public String name() {
-        return "cooperative-sticky";
+        return COOPERATIVE_STICKY_ASSIGNOR_NAME;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 89c9d5e..5035872 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -36,11 +36,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.UnstableOffsetCommitException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnstableOffsetCommitException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -81,6 +81,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
+
 /**
  * This class manages the coordination process with the consumer coordinator.
  */
@@ -563,6 +565,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         ConsumerPartitionAssignor assignor = 
lookupAssignor(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid 
assignment protocol: " + assignmentStrategy);
+        String assignorName = assignor.name();
 
         Set<String> allSubscribedTopics = new HashSet<>();
         Map<String, Subscription> subscriptions = new HashMap<>();
@@ -584,11 +587,13 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         isLeader = true;
 
-        log.debug("Performing assignment using strategy {} with subscriptions 
{}", assignor.name(), subscriptions);
+        log.debug("Performing assignment using strategy {} with subscriptions 
{}", assignorName, subscriptions);
 
         Map<String, Assignment> assignments = 
assignor.assign(metadata.fetch(), new 
GroupSubscription(subscriptions)).groupAssignment();
 
-        if (protocol == RebalanceProtocol.COOPERATIVE) {
+        // skip the validation for built-in cooperative sticky assignor since 
we've considered
+        // the "generation" of ownedPartition inside the assignor
+        if (protocol == RebalanceProtocol.COOPERATIVE && 
!assignorName.equals(COOPERATIVE_STICKY_ASSIGNOR_NAME)) {
             validateCooperativeAssignment(ownedPartitions, assignments);
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ae2ab8a..189de7a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -56,6 +56,10 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
@@ -78,6 +82,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -101,11 +106,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;
 import static 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
+import static 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.TestUtils.toSet;
@@ -141,6 +148,7 @@ public class ConsumerCoordinatorTest {
     private final List<ConsumerPartitionAssignor> assignors;
     private final Map<String, MockPartitionAssignor> assignorMap;
     private final String consumerId = "consumer";
+    private final String consumerId2 = "consumer2";
 
     private MockClient client;
     private MetadataResponse metadataResponse = 
TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
@@ -268,6 +276,101 @@ public class ConsumerCoordinatorTest {
         return metrics.metrics().get(metrics.metricName(name, consumerId + 
groupId + "-coordinator-metrics"));
     }
 
+    public ByteBuffer subscriptionUserData(int generation) {
+        final String generationKeyName = "generation";
+        final Schema cooperativeStickyAssignorUserDataV0 = new Schema(
+            new Field(generationKeyName, Type.INT32));
+        Struct struct = new Struct(cooperativeStickyAssignorUserDataV0);
+
+        struct.set(generationKeyName, generation);
+        ByteBuffer buffer = 
ByteBuffer.allocate(cooperativeStickyAssignorUserDataV0.sizeOf(struct));
+        cooperativeStickyAssignorUserDataV0.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    private List<JoinGroupResponseData.JoinGroupResponseMember> 
validateCooperativeAssignmentTestSetup() {
+        // consumer1 and consumer2 subscribed to "topic1" with 2 partitions: 
t1p, t2p
+        Map<String, List<String>> memberSubscriptions = new HashMap<>();
+        List<String> subscribedTopics = singletonList(topic1);
+        memberSubscriptions.put(consumerId, subscribedTopics);
+        memberSubscriptions.put(consumerId2, subscribedTopics);
+
+        // the ownedPartition for consumer1 is t1p, t2p
+        ConsumerPartitionAssignor.Subscription subscriptionConsumer1 = new 
ConsumerPartitionAssignor.Subscription(
+            subscribedTopics, subscriptionUserData(1), Arrays.asList(t1p, 
t2p));
+
+        // the ownedPartition for consumer2 is empty
+        ConsumerPartitionAssignor.Subscription subscriptionConsumer2 = new 
ConsumerPartitionAssignor.Subscription(
+            subscribedTopics, subscriptionUserData(1), emptyList());
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new 
ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : 
memberSubscriptions.entrySet()) {
+            ByteBuffer buf = null;
+            if (subscriptionEntry.getKey().equals(consumerId)) {
+                buf = 
ConsumerProtocol.serializeSubscription(subscriptionConsumer1);
+            } else {
+                buf = 
ConsumerProtocol.serializeSubscription(subscriptionConsumer2);
+            }
+
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        return metadata;
+    }
+
+    @Test
+    public void testPerformAssignmentShouldValidateCooperativeAssignment() {
+        SubscriptionState mockSubscriptionState = 
Mockito.mock(SubscriptionState.class);
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = 
validateCooperativeAssignmentTestSetup();
+
+        // simulate the custom cooperative assignor didn't revoke the 
partition first before assign to other consumer
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(consumerId, Arrays.asList(t1p));
+        assignment.put(consumerId2, Arrays.asList(t2p));
+        partitionAssignor.prepare(assignment);
+
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, false, 
mockSubscriptionState)) {
+            if (protocol == COOPERATIVE) {
+                // in cooperative protocol, we should throw exception when 
validating cooperative assignment
+                Exception e = assertThrows(IllegalStateException.class,
+                    () -> coordinator.performAssignment("1", 
partitionAssignor.name(), metadata));
+                assertTrue(e.getMessage().contains("Assignor supporting the 
COOPERATIVE protocol violates its requirements"));
+            } else {
+                // in eager protocol, we should not validate assignment
+                coordinator.performAssignment("1", partitionAssignor.name(), 
metadata);
+            }
+        }
+    }
+
+    @Test
+    public void 
testPerformAssignmentShouldSkipValidateCooperativeAssignmentForBuiltInCooperativeStickyAssignor()
 {
+        SubscriptionState mockSubscriptionState = 
Mockito.mock(SubscriptionState.class);
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = 
validateCooperativeAssignmentTestSetup();
+
+        List<ConsumerPartitionAssignor> assignorsWithCooperativeStickyAssignor 
= new ArrayList<>(assignors);
+        // create a mockPartitionAssignor with the same name as cooperative 
sticky assignor
+        MockPartitionAssignor mockCooperativeStickyAssignor = new 
MockPartitionAssignor(Collections.singletonList(protocol)) {
+            @Override
+            public String name() {
+                return COOPERATIVE_STICKY_ASSIGNOR_NAME;
+            }
+        };
+        
assignorsWithCooperativeStickyAssignor.add(mockCooperativeStickyAssignor);
+
+        // simulate the cooperative sticky assignor do the assignment with 
out-of-date ownedPartition
+        Map<String, List<TopicPartition>> assignment = new HashMap<>();
+        assignment.put(consumerId, Arrays.asList(t1p));
+        assignment.put(consumerId2, Arrays.asList(t2p));
+        mockCooperativeStickyAssignor.prepare(assignment);
+
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), 
assignorsWithCooperativeStickyAssignor, false, mockSubscriptionState)) {
+            // should not validate assignment for built-in cooperative sticky 
assignor
+            coordinator.performAssignment("1", 
mockCooperativeStickyAssignor.name(), metadata);
+        }
+    }
     @Test
     public void testSelectRebalanceProtcol() {
         List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
@@ -2993,21 +3096,29 @@ public class ConsumerCoordinatorTest {
     private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig 
rebalanceConfig,
                                                  final Metrics metrics,
                                                  final 
List<ConsumerPartitionAssignor> assignors,
-                                                 final boolean 
autoCommitEnabled) {
+                                                 final boolean 
autoCommitEnabled,
+                                                 final SubscriptionState 
subscriptions) {
         return new ConsumerCoordinator(
-                rebalanceConfig,
-                new LogContext(),
-                consumerClient,
-                assignors,
-                metadata,
-                subscriptions,
-                metrics,
-                consumerId + groupId,
-                time,
-                autoCommitEnabled,
-                autoCommitIntervalMs,
-                null,
-                false);
+            rebalanceConfig,
+            new LogContext(),
+            consumerClient,
+            assignors,
+            metadata,
+            subscriptions,
+            metrics,
+            consumerId + groupId,
+            time,
+            autoCommitEnabled,
+            autoCommitIntervalMs,
+            null,
+            false);
+    }
+
+    private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig 
rebalanceConfig,
+                                                 final Metrics metrics,
+                                                 final 
List<ConsumerPartitionAssignor> assignors,
+                                                 final boolean 
autoCommitEnabled) {
+        return buildCoordinator(rebalanceConfig, metrics, assignors, 
autoCommitEnabled, subscriptions);
     }
 
     private Collection<TopicPartition> getRevoked(final List<TopicPartition> 
owned,

Reply via email to