This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9556611cda KAFKA-15833: Restrict Consumer API to be used from one 
thread (#14779)
b9556611cda is described below

commit b9556611cdab932bdbae667bd3fd0287fa912a40
Author: Lucas Brutschy <[email protected]>
AuthorDate: Sun Nov 19 21:24:28 2023 +0100

    KAFKA-15833: Restrict Consumer API to be used from one thread (#14779)
    
    The legacy consumer detects any concurrent operations from different
    threads. This was not enforced in the new consumer. To avoid inconsistencies
    in the behavior, we enforce the same restriction in the new consumer.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 468 ++++++++++++++-------
 .../consumer/internals/AsyncKafkaConsumerTest.java |   7 +
 2 files changed, 315 insertions(+), 160 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 9f8d7206578..6746485a7ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -120,6 +123,8 @@ import static org.apache.kafka.common.utils.Utils.join;
  */
 public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
 
+    private static final long NO_CURRENT_THREAD = -1L;
+
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private final Optional<String> groupId;
@@ -152,6 +157,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
+    // currentThread holds the threadId of the current thread accessing the 
AsyncKafkaConsumer
+    // and is used to prevent multithreaded access
+    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
+    private final AtomicInteger refCount = new AtomicInteger(0);
+
     AsyncKafkaConsumer(final ConsumerConfig config,
                        final Deserializer<K> keyDeserializer,
                        final Deserializer<V> valueDeserializer) {
@@ -390,6 +400,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public ConsumerRecords<K, V> poll(final Duration timeout) {
         Timer timer = time.timer(timeout);
 
+        acquireAndEnsureOpen();
         try {
             kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
@@ -415,6 +426,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return ConsumerRecords.empty();
         } finally {
             kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+            release();
         }
     }
 
@@ -442,17 +454,22 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
-        CompletableFuture<Void> future = commit(offsets, false);
-        final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
-        future.whenComplete((r, t) -> {
-            if (t != null) {
-                commitCallback.onComplete(offsets, new KafkaException(t));
-            } else {
-                commitCallback.onComplete(offsets, null);
-            }
-        }).exceptionally(e -> {
-            throw new KafkaException(e);
-        });
+        acquireAndEnsureOpen();
+        try {
+            CompletableFuture<Void> future = commit(offsets, false);
+            final OffsetCommitCallback commitCallback = callback == null ? new 
DefaultOffsetCommitCallback() : callback;
+            future.whenComplete((r, t) -> {
+                if (t != null) {
+                    commitCallback.onComplete(offsets, new KafkaException(t));
+                } else {
+                    commitCallback.onComplete(offsets, null);
+                }
+            }).exceptionally(e -> {
+                throw new KafkaException(e);
+            });
+        } finally {
+            release();
+        }
     }
 
     // Visible for testing
@@ -472,12 +489,17 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (offset < 0)
             throw new IllegalArgumentException("seek offset must not be a 
negative number");
 
-        log.info("Seeking to offset {} for partition {}", offset, partition);
-        SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+        acquireAndEnsureOpen();
+        try {
+            log.info("Seeking to offset {} for partition {}", offset, 
partition);
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
                 offset,
                 Optional.empty(), // This will ensure we skip validation
                 metadata.currentLeader(partition));
-        subscriptions.seekUnvalidated(partition, newPosition);
+            subscriptions.seekUnvalidated(partition, newPosition);
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -487,19 +509,24 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             throw new IllegalArgumentException("seek offset must not be a 
negative number");
         }
 
-        if (offsetAndMetadata.leaderEpoch().isPresent()) {
-            log.info("Seeking to offset {} for partition {} with epoch {}",
+        acquireAndEnsureOpen();
+        try {
+            if (offsetAndMetadata.leaderEpoch().isPresent()) {
+                log.info("Seeking to offset {} for partition {} with epoch {}",
                     offset, partition, offsetAndMetadata.leaderEpoch().get());
-        } else {
-            log.info("Seeking to offset {} for partition {}", offset, 
partition);
-        }
-        Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
metadata.currentLeader(partition);
-        SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+            } else {
+                log.info("Seeking to offset {} for partition {}", offset, 
partition);
+            }
+            Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
metadata.currentLeader(partition);
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
                 offsetAndMetadata.offset(),
                 offsetAndMetadata.leaderEpoch(),
                 currentLeaderAndEpoch);
-        updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
-        subscriptions.seekUnvalidated(partition, newPosition);
+            updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
+            subscriptions.seekUnvalidated(partition, newPosition);
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -507,8 +534,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot 
be null");
 
-        Collection<TopicPartition> parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
-        subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
+        acquireAndEnsureOpen();
+        try {
+            Collection<TopicPartition> parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
+            subscriptions.requestOffsetReset(parts, 
OffsetResetStrategy.EARLIEST);
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -516,8 +548,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot 
be null");
 
-        Collection<TopicPartition> parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
-        subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
+        acquireAndEnsureOpen();
+        try {
+            Collection<TopicPartition> parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
+            subscriptions.requestOffsetReset(parts, 
OffsetResetStrategy.LATEST);
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -527,20 +564,25 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public long position(TopicPartition partition, Duration timeout) {
-        if (!subscriptions.isAssigned(partition))
-            throw new IllegalStateException("You can only check the position 
for partitions assigned to this consumer.");
+        acquireAndEnsureOpen();
+        try {
+            if (!subscriptions.isAssigned(partition))
+                throw new IllegalStateException("You can only check the 
position for partitions assigned to this consumer.");
 
-        Timer timer = time.timer(timeout);
-        do {
-            SubscriptionState.FetchPosition position = 
subscriptions.validPosition(partition);
-            if (position != null)
-                return position.offset;
+            Timer timer = time.timer(timeout);
+            do {
+                SubscriptionState.FetchPosition position = 
subscriptions.validPosition(partition);
+                if (position != null)
+                    return position.offset;
 
-            updateFetchPositions(timer);
-        } while (timer.notExpired());
+                updateFetchPositions(timer);
+            } while (timer.notExpired());
 
-        throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms 
expired before the position " +
+            throw new TimeoutException("Timeout of " + timeout.toMillis() + 
"ms expired before the position " +
                 "for partition " + partition + " could be determined");
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -563,17 +605,22 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions,
                                                             final Duration 
timeout) {
-        maybeThrowInvalidGroupIdException();
-        if (partitions.isEmpty()) {
-            return new HashMap<>();
-        }
-
-        final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
-        wakeupTrigger.setActiveTask(event.future());
+        acquireAndEnsureOpen();
         try {
-            return applicationEventHandler.addAndGet(event, 
time.timer(timeout));
+            maybeThrowInvalidGroupIdException();
+            if (partitions.isEmpty()) {
+                return new HashMap<>();
+            }
+
+            final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+            wakeupTrigger.setActiveTask(event.future());
+            try {
+                return applicationEventHandler.addAndGet(event, 
time.timer(timeout));
+            } finally {
+                wakeupTrigger.clearActiveTask();
+            }
         } finally {
-            wakeupTrigger.clearActiveTask();
+            release();
         }
     }
 
@@ -611,22 +658,37 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Set<TopicPartition> paused() {
-        return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+        acquireAndEnsureOpen();
+        try {
+            return 
Collections.unmodifiableSet(subscriptions.pausedPartitions());
+        } finally {
+            release();
+        }
     }
 
     @Override
     public void pause(Collection<TopicPartition> partitions) {
-        log.debug("Pausing partitions {}", partitions);
-        for (TopicPartition partition: partitions) {
-            subscriptions.pause(partition);
+        acquireAndEnsureOpen();
+        try {
+            log.debug("Pausing partitions {}", partitions);
+            for (TopicPartition partition : partitions) {
+                subscriptions.pause(partition);
+            }
+        } finally {
+            release();
         }
     }
 
     @Override
     public void resume(Collection<TopicPartition> partitions) {
-        log.debug("Resuming partitions {}", partitions);
-        for (TopicPartition partition: partitions) {
-            subscriptions.resume(partition);
+        acquireAndEnsureOpen();
+        try {
+            log.debug("Resuming partitions {}", partitions);
+            for (TopicPartition partition : partitions) {
+                subscriptions.resume(partition);
+            }
+        } finally {
+            release();
         }
     }
 
@@ -637,30 +699,35 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
{
-        // Keeping same argument validation error thrown by the current 
consumer implementation
-        // to avoid API level changes.
-        requireNonNull(timestampsToSearch, "Timestamps to search cannot be 
null");
-        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
-            // Exclude the earliest and latest offset here so the timestamp in 
the returned
-            // OffsetAndTimestamp is always positive.
-            if (entry.getValue() < 0)
-                throw new IllegalArgumentException("The target time for 
partition " + entry.getKey() + " is " +
+        acquireAndEnsureOpen();
+        try {
+            // Keeping same argument validation error thrown by the current 
consumer implementation
+            // to avoid API level changes.
+            requireNonNull(timestampsToSearch, "Timestamps to search cannot be 
null");
+            for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
+                // Exclude the earliest and latest offset here so the 
timestamp in the returned
+                // OffsetAndTimestamp is always positive.
+                if (entry.getValue() < 0)
+                    throw new IllegalArgumentException("The target time for 
partition " + entry.getKey() + " is " +
                         entry.getValue() + ". The target time cannot be 
negative.");
-        }
+            }
 
-        if (timestampsToSearch.isEmpty()) {
-            return Collections.emptyMap();
-        }
-        final ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
+            if (timestampsToSearch.isEmpty()) {
+                return Collections.emptyMap();
+            }
+            final ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
                 timestampsToSearch,
                 true);
 
-        // If timeout is set to zero return empty immediately; otherwise try 
to get the results
-        // and throw timeout exception if it cannot complete in time.
-        if (timeout.toMillis() == 0L)
-            return listOffsetsEvent.emptyResult();
+            // If timeout is set to zero return empty immediately; otherwise 
try to get the results
+            // and throw timeout exception if it cannot complete in time.
+            if (timeout.toMillis() == 0L)
+                return listOffsetsEvent.emptyResult();
 
-        return applicationEventHandler.addAndGet(listOffsetsEvent, 
time.timer(timeout));
+            return applicationEventHandler.addAndGet(listOffsetsEvent, 
time.timer(timeout));
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -686,49 +753,59 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private Map<TopicPartition, Long> 
beginningOrEndOffset(Collection<TopicPartition> partitions,
                                                            long timestamp,
                                                            Duration timeout) {
-        // Keeping same argument validation error thrown by the current 
consumer implementation
-        // to avoid API level changes.
-        requireNonNull(partitions, "Partitions cannot be null");
+        acquireAndEnsureOpen();
+        try {
+            // Keeping same argument validation error thrown by the current 
consumer implementation
+            // to avoid API level changes.
+            requireNonNull(partitions, "Partitions cannot be null");
 
-        if (partitions.isEmpty()) {
-            return Collections.emptyMap();
-        }
-        Map<TopicPartition, Long> timestampToSearch = partitions
+            if (partitions.isEmpty()) {
+                return Collections.emptyMap();
+            }
+            Map<TopicPartition, Long> timestampToSearch = partitions
                 .stream()
                 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
-        ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
+            ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
                 timestampToSearch,
                 false);
-        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+            Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
                 listOffsetsEvent,
                 time.timer(timeout));
-        return offsetAndTimestampMap
+            return offsetAndTimestampMap
                 .entrySet()
                 .stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+        } finally {
+            release();
+        }
     }
 
     @Override
     public OptionalLong currentLag(TopicPartition topicPartition) {
-        final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-
-        // if the log end offset is not known and hence cannot return lag and 
there is
-        // no in-flight list offset requested yet,
-        // issue a list offset request for that partition so that next time
-        // we may get the answer; we do not need to wait for the return value
-        // since we would not try to poll the network client synchronously
-        if (lag == null) {
-            if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
+        acquireAndEnsureOpen();
+        try {
+            final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
+
+            // if the log end offset is not known and hence cannot return lag 
and there is
+            // no in-flight list offset requested yet,
+            // issue a list offset request for that partition so that next time
+            // we may get the answer; we do not need to wait for the return 
value
+            // since we would not try to poll the network client synchronously
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
                     
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
-                log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
-                subscriptions.requestPartitionEndOffset(topicPartition);
-                endOffsets(Collections.singleton(topicPartition), 
Duration.ofMillis(0));
+                    log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+                    subscriptions.requestPartitionEndOffset(topicPartition);
+                    endOffsets(Collections.singleton(topicPartition), 
Duration.ofMillis(0));
+                }
+
+                return OptionalLong.empty();
             }
 
-            return OptionalLong.empty();
+            return OptionalLong.of(lag);
+        } finally {
+            release();
         }
-
-        return OptionalLong.of(lag);
     }
 
     @Override
@@ -755,7 +832,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public void close(Duration timeout) {
         if (timeout.toMillis() < 0)
             throw new IllegalArgumentException("The timeout cannot be 
negative.");
-
+        acquire();
         try {
             if (!closed) {
                 // need to close before setting the flag since the close 
function
@@ -764,6 +841,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             }
         } finally {
             closed = true;
+            release();
         }
     }
 
@@ -814,6 +892,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
Duration timeout) {
+        acquireAndEnsureOpen();
         long commitStart = time.nanoseconds();
         try {
             CompletableFuture<Void> commitFuture = commit(offsets, true);
@@ -822,6 +901,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         } finally {
             wakeupTrigger.clearActiveTask();
             kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - 
commitStart);
+            release();
         }
     }
 
@@ -832,7 +912,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Set<TopicPartition> assignment() {
-        return Collections.unmodifiableSet(subscriptions.assignedPartitions());
+        acquireAndEnsureOpen();
+        try {
+            return 
Collections.unmodifiableSet(subscriptions.assignedPartitions());
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -842,46 +927,56 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      */
     @Override
     public Set<String> subscription() {
-        return Collections.unmodifiableSet(subscriptions.subscription());
+        acquireAndEnsureOpen();
+        try {
+            return Collections.unmodifiableSet(subscriptions.subscription());
+        } finally {
+            release();
+        }
     }
 
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        if (partitions == null) {
-            throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
-        }
+        acquireAndEnsureOpen();
+        try {
+            if (partitions == null) {
+                throw new IllegalArgumentException("Topic partitions 
collection to assign to cannot be null");
+            }
 
-        if (partitions.isEmpty()) {
-            unsubscribe();
-            return;
-        }
+            if (partitions.isEmpty()) {
+                unsubscribe();
+                return;
+            }
 
-        for (TopicPartition tp : partitions) {
-            String topic = (tp != null) ? tp.topic() : null;
-            if (isBlank(topic))
-                throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
-        }
+            for (TopicPartition tp : partitions) {
+                String topic = (tp != null) ? tp.topic() : null;
+                if (isBlank(topic))
+                    throw new IllegalArgumentException("Topic partitions to 
assign to cannot have null or empty topic");
+            }
 
-        // Clear the buffered data which are not a part of newly assigned 
topics
-        final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+            // Clear the buffered data which are not a part of newly assigned 
topics
+            final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
 
-        for (TopicPartition tp : subscriptions.assignedPartitions()) {
-            if (partitions.contains(tp))
-                currentTopicPartitions.add(tp);
-        }
+            for (TopicPartition tp : subscriptions.assignedPartitions()) {
+                if (partitions.contains(tp))
+                    currentTopicPartitions.add(tp);
+            }
 
-        fetchBuffer.retainAll(currentTopicPartitions);
+            fetchBuffer.retainAll(currentTopicPartitions);
 
-        // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
-        // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-        // be no following rebalance.
-        //
-        // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
-        applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
+            // assignment change event will trigger autocommit if it is 
configured and the group id is specified. This is
+            // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
+            // be no following rebalance.
+            //
+            // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
+            applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
 
-        log.info("Assigned to partition(s): {}", join(partitions, ", "));
-        if (subscriptions.assignFromUser(new HashSet<>(partitions)))
-            applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
+            log.info("Assigned to partition(s): {}", join(partitions, ", "));
+            if (subscriptions.assignFromUser(new HashSet<>(partitions)))
+                applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -904,8 +999,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void unsubscribe() {
-        fetchBuffer.retainAll(Collections.emptySet());
-        subscriptions.unsubscribe();
+        acquireAndEnsureOpen();
+        try {
+            fetchBuffer.retainAll(Collections.emptySet());
+            subscriptions.unsubscribe();
+        } finally {
+            release();
+        }
     }
 
     @Override
@@ -1102,46 +1202,94 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         subscribeInternal(pattern, Optional.of(listener));
     }
 
+    /**
+     * Acquire the light lock and ensure that the consumer hasn't been closed.
+     *
+     * @throws IllegalStateException If the consumer has been closed
+     */
+    private void acquireAndEnsureOpen() {
+        acquire();
+        if (this.closed) {
+            release();
+            throw new IllegalStateException("This consumer has already been 
closed.");
+        }
+    }
+
+    /**
+     * Acquire the light lock protecting this consumer from multithreaded 
access. Instead of blocking
+     * when the lock is not available, however, we just throw an exception 
(since multithreaded usage is not
+     * supported).
+     *
+     * @throws ConcurrentModificationException if another thread already has 
the lock
+     */
+    private void acquire() {
+        final Thread thread = Thread.currentThread();
+        final long threadId = thread.getId();
+        if (threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+            throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access. " +
+                "currentThread(name: " + thread.getName() + ", id: " + 
threadId + ")" +
+                " otherThread(id: " + currentThread.get() + ")"
+            );
+        refCount.incrementAndGet();
+    }
+
+    /**
+     * Release the light lock protecting the consumer from multithreaded 
access.
+     */
+    private void release() {
+        if (refCount.decrementAndGet() == 0)
+            currentThread.set(NO_CURRENT_THREAD);
+    }
+
     private void subscribeInternal(Pattern pattern, 
Optional<ConsumerRebalanceListener> listener) {
-        maybeThrowInvalidGroupIdException();
-        if (pattern == null || pattern.toString().isEmpty())
-            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (pattern == null || pattern.toString().isEmpty())
+                throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be " + (pattern == null ?
                     "null" : "empty"));
-
-        throwIfNoAssignorsConfigured();
-        log.info("Subscribed to pattern: '{}'", pattern);
-        subscriptions.subscribe(pattern, listener);
-        updatePatternSubscription(metadata.fetch());
-        metadata.requestUpdateForNewTopics();
+            throwIfNoAssignorsConfigured();
+            log.info("Subscribed to pattern: '{}'", pattern);
+            subscriptions.subscribe(pattern, listener);
+            updatePatternSubscription(metadata.fetch());
+            metadata.requestUpdateForNewTopics();
+        } finally {
+            release();
+        }
     }
 
     private void subscribeInternal(Collection<String> topics, 
Optional<ConsumerRebalanceListener> listener) {
-        maybeThrowInvalidGroupIdException();
-        if (topics == null)
-            throw new IllegalArgumentException("Topic collection to subscribe 
to cannot be null");
-        if (topics.isEmpty()) {
-            // treat subscribing to empty topic list as the same as 
unsubscribing
-            unsubscribe();
-        } else {
-            for (String topic : topics) {
-                if (isBlank(topic))
-                    throw new IllegalArgumentException("Topic collection to 
subscribe to cannot contain null or empty topic");
-            }
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            if (topics == null)
+                throw new IllegalArgumentException("Topic collection to 
subscribe to cannot be null");
+            if (topics.isEmpty()) {
+                // treat subscribing to empty topic list as the same as 
unsubscribing
+                unsubscribe();
+            } else {
+                for (String topic : topics) {
+                    if (isBlank(topic))
+                        throw new IllegalArgumentException("Topic collection 
to subscribe to cannot contain null or empty topic");
+                }
 
-            throwIfNoAssignorsConfigured();
+                throwIfNoAssignorsConfigured();
 
-            // Clear the buffered data which are not a part of newly assigned 
topics
-            final Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+                // Clear the buffered data which are not a part of newly 
assigned topics
+                final Set<TopicPartition> currentTopicPartitions = new 
HashSet<>();
 
-            for (TopicPartition tp : subscriptions.assignedPartitions()) {
-                if (topics.contains(tp.topic()))
-                    currentTopicPartitions.add(tp);
-            }
+                for (TopicPartition tp : subscriptions.assignedPartitions()) {
+                    if (topics.contains(tp.topic()))
+                        currentTopicPartitions.add(tp);
+                }
 
-            fetchBuffer.retainAll(currentTopicPartitions);
-            log.info("Subscribed to topic(s): {}", join(topics, ", "));
-            if (subscriptions.subscribe(new HashSet<>(topics), listener))
-                metadata.requestUpdateForNewTopics();
+                fetchBuffer.retainAll(currentTopicPartitions);
+                log.info("Subscribed to topic(s): {}", join(topics, ", "));
+                if (subscriptions.subscribe(new HashSet<>(topics), listener))
+                    metadata.requestUpdateForNewTopics();
+            }
+        } finally {
+            release();
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 7f506ec8f9f..c3b1e9fa50a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -112,6 +112,13 @@ public class AsyncKafkaConsumerTest {
         assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(new HashSet<>()));
     }
 
+    @Test
+    public void testFailOnClosedConsumer() {
+        consumer.close();
+        final IllegalStateException res = 
assertThrows(IllegalStateException.class, consumer::assignment);
+        assertEquals("This consumer has already been closed.", 
res.getMessage());
+    }
+
     @Test
     public void testCommitAsync_NullCallback() throws InterruptedException {
         CompletableFuture<Void> future = new CompletableFuture<>();

Reply via email to