This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1656591d0b3 KAFKA-14950: implement assign() and assignment() (#13797)
1656591d0b3 is described below

commit 1656591d0b339c385d0ba1f938fc94b52e29965d
Author: Philip Nee <[email protected]>
AuthorDate: Fri Jul 21 13:59:00 2023 -0700

    KAFKA-14950: implement assign() and assignment() (#13797)
    
    We will explicitly send an assignment change event to the background thread 
to invoke auto-commit if the group.id is configured. After updating the 
subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to 
the background thread to update the metadata.
    
    Co-authored-by: Kirk True <[email protected]>
    Reviewers: Jun Rao <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   |  7 +-
 .../internals/DefaultBackgroundThread.java         | 24 ++++--
 .../consumer/internals/DefaultEventHandler.java    |  3 +
 .../consumer/internals/PrototypeAsyncConsumer.java | 33 +++++++-
 .../internals/events/ApplicationEvent.java         |  2 +-
 .../events/ApplicationEventProcessor.java          | 30 ++++++-
 ....java => AssignmentChangeApplicationEvent.java} | 30 +++----
 ...va => NewTopicsMetadataUpdateRequestEvent.java} | 23 +-----
 .../internals/CommitRequestManagerTest.java        |  2 +-
 .../internals/DefaultBackgroundThreadTest.java     | 93 ++++++++++++++++++++--
 .../internals/PrototypeAsyncConsumerTest.java      | 43 +++++++++-
 11 files changed, 226 insertions(+), 64 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 82959d26ce3..b441196a08f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -92,7 +92,7 @@ public class CommitRequestManager implements RequestManager {
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        maybeAutoCommit();
+        maybeAutoCommit(this.subscriptionState.allConsumed());
         if (!pendingRequests.hasUnsentRequests()) {
             return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, 
Collections.emptyList());
         }
@@ -101,7 +101,7 @@ public class CommitRequestManager implements RequestManager 
{
                 
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
     }
 
-    private void maybeAutoCommit() {
+    public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
         if (!autoCommitState.isPresent()) {
             return;
         }
@@ -111,8 +111,7 @@ public class CommitRequestManager implements RequestManager 
{
             return;
         }
 
-        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptionState.allConsumed();
-        sendAutoCommit(allConsumedOffsets);
+        sendAutoCommit(offsets);
         autocommit.resetTimer();
         autocommit.setInflightCommitStatus(true);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index ba50d0f5354..2b2bd29ed26 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -34,11 +34,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Background thread runnable that consumes {@code ApplicationEvent} and
  * produces {@code BackgroundEvent}. It uses an event loop to consume and
@@ -61,6 +62,7 @@ public class DefaultBackgroundThread extends KafkaThread {
     private final NetworkClientDelegate networkClientDelegate;
     private final ErrorEventHandler errorEventHandler;
     private final GroupState groupState;
+    private final SubscriptionState subscriptionState;
     private boolean running;
 
     private final Map<RequestManager.Type, Optional<RequestManager>> 
requestManagerRegistry;
@@ -71,6 +73,7 @@ public class DefaultBackgroundThread extends KafkaThread {
                             final LogContext logContext,
                             final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                             final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                            final SubscriptionState subscriptionState,
                             final ErrorEventHandler errorEventHandler,
                             final ApplicationEventProcessor processor,
                             final ConsumerMetadata metadata,
@@ -90,6 +93,7 @@ public class DefaultBackgroundThread extends KafkaThread {
         this.networkClientDelegate = networkClient;
         this.errorEventHandler = errorEventHandler;
         this.groupState = groupState;
+        this.subscriptionState = subscriptionState;
 
         this.requestManagerRegistry = new HashMap<>();
         this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, 
Optional.ofNullable(coordinatorManager));
@@ -102,15 +106,24 @@ public class DefaultBackgroundThread extends KafkaThread {
                                    final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                                    final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
                                    final ConsumerMetadata metadata,
+                                   final SubscriptionState subscriptionState,
                                    final KafkaClient networkClient) {
         super(BACKGROUND_THREAD_NAME, true);
+        requireNonNull(config);
+        requireNonNull(rebalanceConfig);
+        requireNonNull(logContext);
+        requireNonNull(applicationEventQueue);
+        requireNonNull(backgroundEventQueue);
+        requireNonNull(metadata);
+        requireNonNull(subscriptionState);
+        requireNonNull(networkClient);
         try {
             this.time = time;
             this.log = logContext.logger(getClass());
             this.applicationEventQueue = applicationEventQueue;
             this.backgroundEventQueue = backgroundEventQueue;
+            this.subscriptionState = subscriptionState;
             this.config = config;
-            // subscriptionState is initialized by the polling thread
             this.metadata = metadata;
             this.networkClientDelegate = new NetworkClientDelegate(
                     this.time,
@@ -121,7 +134,7 @@ public class DefaultBackgroundThread extends KafkaThread {
             this.errorEventHandler = new 
ErrorEventHandler(this.backgroundEventQueue);
             this.groupState = new GroupState(rebalanceConfig);
             this.requestManagerRegistry = 
Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
-            this.applicationEventProcessor = new 
ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry);
+            this.applicationEventProcessor = new 
ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, 
metadata);
         } catch (final Exception e) {
             close();
             throw new KafkaException("Failed to construct background 
processor", e.getCause());
@@ -138,11 +151,10 @@ public class DefaultBackgroundThread extends KafkaThread {
                         config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
                         errorEventHandler,
                         groupState.groupId);
-        // Add subscriptionState
         CommitRequestManager commitRequestManager = coordinatorManager == null 
?
                 null :
                 new CommitRequestManager(time,
-                        logContext, null, config,
+                        logContext, this.subscriptionState, config,
                         coordinatorManager,
                         groupState);
         registry.put(RequestManager.Type.COORDINATOR, 
Optional.ofNullable(coordinatorManager));
@@ -214,7 +226,7 @@ public class DefaultBackgroundThread extends KafkaThread {
     }
 
     private void consumeApplicationEvent(final ApplicationEvent event) {
-        Objects.requireNonNull(event);
+        requireNonNull(event);
         applicationEventProcessor.process(event);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
index b55bdbff87c..5a0cf55bc0e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java
@@ -126,6 +126,7 @@ public class DefaultEventHandler implements EventHandler {
             this.applicationEventQueue,
             this.backgroundEventQueue,
             metadata,
+            subscriptionState,
             networkClient);
         this.backgroundThread.start();
     }
@@ -137,6 +138,7 @@ public class DefaultEventHandler implements EventHandler {
                         final LogContext logContext,
                         final BlockingQueue<ApplicationEvent> 
applicationEventQueue,
                         final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                        final SubscriptionState subscriptionState,
                         final ConsumerMetadata metadata,
                         final KafkaClient networkClient) {
         this.applicationEventQueue = applicationEventQueue;
@@ -149,6 +151,7 @@ public class DefaultEventHandler implements EventHandler {
             this.applicationEventQueue,
             this.backgroundEventQueue,
             metadata,
+            subscriptionState,
             networkClient);
         backgroundThread.start();
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
index b16753c33f1..be67251bcb9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
@@ -29,9 +29,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -58,6 +60,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -497,7 +500,7 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
 
     @Override
     public Set<TopicPartition> assignment() {
-        throw new KafkaException("method not implemented");
+        return 
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
     }
 
     /**
@@ -522,7 +525,33 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
 
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        throw new KafkaException("method not implemented");
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+        }
+
+        if (partitions.isEmpty()) {
+            // TODO: implementation of unsubscribe() will be included in 
forthcoming commits.
+            // this.unsubscribe();
+            return;
+        }
+
+        for (TopicPartition tp : partitions) {
+            String topic = (tp != null) ? tp.topic() : null;
+            if (Utils.isBlank(topic))
+                throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+        }
+
+        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
+        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
+        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
+        // be no following rebalance
+        eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+
+        log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
+        if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+            eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 98b2aebeb4b..9a8b2dde837 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -36,6 +36,6 @@ abstract public class ApplicationEvent {
         return type + " ApplicationEvent";
     }
     public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 0200f8d84c6..ae57b3adfd4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.RequestManager;
 import org.apache.kafka.common.KafkaException;
@@ -27,14 +28,17 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 
 public class ApplicationEventProcessor {
+
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private final Map<RequestManager.Type, Optional<RequestManager>> registry;
+    private final ConsumerMetadata metadata;
 
-    public ApplicationEventProcessor(
-            final BlockingQueue<BackgroundEvent> backgroundEventQueue,
-            final Map<RequestManager.Type, Optional<RequestManager>> 
requestManagerRegistry) {
+    public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                                     final Map<RequestManager.Type, 
Optional<RequestManager>> requestManagerRegistry,
+                                     final ConsumerMetadata metadata) {
         this.backgroundEventQueue = backgroundEventQueue;
         this.registry = requestManagerRegistry;
+        this.metadata = metadata;
     }
 
     public boolean process(final ApplicationEvent event) {
@@ -48,6 +52,10 @@ public class ApplicationEventProcessor {
                 return process((PollApplicationEvent) event);
             case FETCH_COMMITTED_OFFSET:
                 return process((OffsetFetchApplicationEvent) event);
+            case METADATA_UPDATE:
+                return process((NewTopicsMetadataUpdateRequestEvent) event);
+            case ASSIGNMENT_CHANGE:
+                return process((AssignmentChangeApplicationEvent) event);
         }
         return false;
     }
@@ -106,4 +114,20 @@ public class ApplicationEventProcessor {
         manager.addOffsetFetchRequest(event.partitions);
         return true;
     }
+
+    private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
+        metadata.requestUpdateForNewTopics();
+        return true;
+    }
+
+    private boolean process(final AssignmentChangeApplicationEvent event) {
+        Optional<RequestManager> commitRequestManger = 
registry.get(RequestManager.Type.COMMIT);
+        if (!commitRequestManger.isPresent()) {
+            return false;
+        }
+        CommitRequestManager manager = (CommitRequestManager) 
commitRequestManger.get();
+        manager.updateAutoCommitTimer(event.currentTimeMs);
+        manager.maybeAutoCommit(event.offsets);
+        return true;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
similarity index 59%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
index 98b2aebeb4b..4346d96dbf3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
@@ -16,26 +16,18 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-/**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
- */
-abstract public class ApplicationEvent {
-    public final Type type;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 
-    protected ApplicationEvent(Type type) {
-        this.type = type;
-    }
-    /**
-     * process the application event. Return true upon successful execution,
-     * false otherwise.
-     * @return true if the event was successfully executed; false otherwise.
-     */
+import java.util.Map;
 
-    @Override
-    public String toString() {
-        return type + " ApplicationEvent";
-    }
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+public class AssignmentChangeApplicationEvent extends ApplicationEvent {
+    final Map<TopicPartition, OffsetAndMetadata> offsets;
+    final long currentTimeMs;
+
+    public AssignmentChangeApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets, final long currentTimeMs) {
+        super(Type.ASSIGNMENT_CHANGE);
+        this.offsets = offsets;
+        this.currentTimeMs = currentTimeMs;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
similarity index 59%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
index 98b2aebeb4b..54cee4ee9de 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
@@ -16,26 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-/**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
- */
-abstract public class ApplicationEvent {
-    public final Type type;
-
-    protected ApplicationEvent(Type type) {
-        this.type = type;
-    }
-    /**
-     * process the application event. Return true upon successful execution,
-     * false otherwise.
-     * @return true if the event was successfully executed; false otherwise.
-     */
+public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
 
-    @Override
-    public String toString() {
-        return type + " ApplicationEvent";
-    }
-    public enum Type {
-        NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
+    public NewTopicsMetadataUpdateRequestEvent() {
+        super(Type.METADATA_UPDATE);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 660deca40b2..00d52f9a762 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -321,7 +321,7 @@ public class CommitRequestManagerTest {
         NetworkClientDelegate.PollResult res = 
manager.poll(time.milliseconds());
         assertEquals(numRes, res.unsentRequests.size());
 
-        return res.unsentRequests.stream().map(r -> 
r.future()).collect(Collectors.toList());
+        return 
res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList());
     }
 
     private CommitRequestManager create(final boolean autoCommitEnabled, final 
long autoCommitInterval) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
index e1b25891871..02cbafc2b45 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java
@@ -18,10 +18,15 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -35,6 +40,8 @@ import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -43,10 +50,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -62,6 +71,7 @@ public class DefaultBackgroundThreadTest {
     private ApplicationEventProcessor processor;
     private CoordinatorRequestManager coordinatorManager;
     private ErrorEventHandler errorEventHandler;
+    private SubscriptionState subscriptionState;
     private int requestTimeoutMs = 500;
     private GroupState groupState;
     private CommitRequestManager commitManager;
@@ -77,6 +87,7 @@ public class DefaultBackgroundThreadTest {
         this.processor = mock(ApplicationEventProcessor.class);
         this.coordinatorManager = mock(CoordinatorRequestManager.class);
         this.errorEventHandler = mock(ErrorEventHandler.class);
+        this.subscriptionState = mock(SubscriptionState.class);
         GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(
                 100,
                 100,
@@ -87,6 +98,9 @@ public class DefaultBackgroundThreadTest {
                 true);
         this.groupState = new GroupState(rebalanceConfig);
         this.commitManager = mock(CommitRequestManager.class);
+        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
     }
 
     @Test
@@ -114,6 +128,62 @@ public class DefaultBackgroundThreadTest {
         backgroundThread.close();
     }
 
+    @Test
+    public void testMetadataUpdateEvent() {
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        this.processor = new 
ApplicationEventProcessor(this.backgroundEventsQueue, 
mockRequestManagerRegistry(),
+            metadata);
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        verify(metadata).requestUpdateForNewTopics();
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testCommitEvent() {
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        
when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        ApplicationEvent e = new CommitApplicationEvent(new HashMap<>());
+        this.applicationEventsQueue.add(e);
+        backgroundThread.runOnce();
+        verify(processor).process(any(CommitApplicationEvent.class));
+        backgroundThread.close();
+    }
+
+    @Test
+    public void testAssignmentChangeEvent() {
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventsQueue = new LinkedBlockingQueue<>();
+        this.processor = spy(new 
ApplicationEventProcessor(this.backgroundEventsQueue, 
mockRequestManagerRegistry(),
+            metadata));
+
+        DefaultBackgroundThread backgroundThread = mockBackgroundThread();
+        HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
+
+        final long currentTimeMs = time.milliseconds();
+        ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, 
currentTimeMs);
+        this.applicationEventsQueue.add(e);
+
+        
when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult());
+        
when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult());
+
+        backgroundThread.runOnce();
+        verify(processor).process(any(AssignmentChangeApplicationEvent.class));
+        verify(networkClient, times(1)).poll(anyLong(), anyLong());
+        verify(commitManager, times(1)).updateAutoCommitTimer(currentTimeMs);
+        verify(commitManager, times(1)).maybeAutoCommit(offset);
+
+        backgroundThread.close();
+    }
+
     @Test
     void testFindCoordinator() {
         DefaultBackgroundThread backgroundThread = mockBackgroundThread();
@@ -140,6 +210,22 @@ public class DefaultBackgroundThreadTest {
         assertEquals(10, backgroundThread.handlePollResult(failure));
     }
 
+    private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {
+        final TopicPartition t0 = new TopicPartition("t0", 2);
+        final TopicPartition t1 = new TopicPartition("t0", 3);
+        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
+        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
+        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
+        return topicPartitionOffsets;
+    }
+
+    private Map<RequestManager.Type, Optional<RequestManager>> 
mockRequestManagerRegistry() {
+        Map<RequestManager.Type, Optional<RequestManager>> registry = new 
HashMap<>();
+        registry.put(RequestManager.Type.COORDINATOR, 
Optional.of(coordinatorManager));
+        registry.put(RequestManager.Type.COMMIT, Optional.of(commitManager));
+        return registry;
+    }
+
     private static NetworkClientDelegate.UnsentRequest 
findCoordinatorUnsentRequest(
             final Time time,
             final long timeout
@@ -155,16 +241,13 @@ public class DefaultBackgroundThreadTest {
     }
 
     private DefaultBackgroundThread mockBackgroundThread() {
-        properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        properties.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
-
         return new DefaultBackgroundThread(
                 this.time,
                 new ConsumerConfig(properties),
                 new LogContext(),
                 applicationEventsQueue,
                 backgroundEventsQueue,
+                subscriptionState,
                 this.errorEventHandler,
                 processor,
                 this.metadata,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
index b35b3a0d1c3..cd80e6464eb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java
@@ -20,9 +20,11 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -39,6 +41,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -46,6 +49,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import static java.util.Collections.singleton;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
@@ -53,6 +57,7 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -157,9 +162,41 @@ public class PrototypeAsyncConsumerTest {
     }
 
     @Test
-    public void testUnimplementedException() {
+    public void testAssign() {
+        this.subscriptions = new SubscriptionState(logContext, 
OffsetResetStrategy.EARLIEST);
+        this.consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+        final TopicPartition tp = new TopicPartition("foo", 3);
+        consumer.assign(singleton(tp));
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().contains(tp));
+        verify(eventHandler).add(any(AssignmentChangeApplicationEvent.class));
+        
verify(eventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class));
+    }
+
+    @Test
+    public void testAssignOnNullTopicPartition() {
+        consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.assign(null));
+    }
+
+    @Test
+    public void testAssignOnEmptyTopicPartition() {
+        consumer = spy(newConsumer(time, new StringDeserializer(), new 
StringDeserializer()));
+        consumer.assign(Collections.emptyList());
+        assertTrue(consumer.subscription().isEmpty());
+        assertTrue(consumer.assignment().isEmpty());
+    }
+
+    @Test
+    public void testAssignOnNullTopicInPartition() {
+        consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.assign(singleton(new TopicPartition(null, 0))));
+    }
+
+    @Test
+    public void testAssignOnEmptyTopicInPartition() {
         consumer = newConsumer(time, new StringDeserializer(), new 
StringDeserializer());
-        assertThrows(KafkaException.class, consumer::assignment, "not 
implemented exception");
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.assign(singleton(new TopicPartition("  ", 0))));
     }
 
     private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {


Reply via email to