This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 284a3db28aa KAFKA-15908: Remove deprecated Consumer.poll(long timeout) 
(#17368)
284a3db28aa is described below

commit 284a3db28aa5f76844f1b1e696d36d88b161c090
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Oct 8 15:49:10 2024 +0100

    KAFKA-15908: Remove deprecated Consumer.poll(long timeout) (#17368)
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot 
<[email protected]>, Lianet Magrans <[email protected]>
---
 .../apache/kafka/clients/consumer/Consumer.java    |  6 ----
 .../kafka/clients/consumer/KafkaConsumer.java      | 39 ----------------------
 .../kafka/clients/consumer/MockConsumer.java       |  6 ----
 .../consumer/internals/AsyncKafkaConsumer.java     |  7 ----
 .../consumer/internals/ClassicKafkaConsumer.java   | 20 +++--------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 26 ---------------
 .../kafka/clients/consumer/MockConsumerTest.java   | 31 -----------------
 .../consumer/internals/AsyncKafkaConsumerTest.java |  9 -----
 .../kafka/api/AuthorizerIntegrationTest.scala      | 34 +++++++++++++------
 .../integration/kafka/api/ConsumerBounceTest.scala |  5 +--
 .../kafka/api/PlaintextConsumerPollTest.scala      | 11 ------
 docs/upgrade.html                                  |  4 +++
 12 files changed, 33 insertions(+), 165 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 055fcfb1b4f..42013955783 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -78,12 +78,6 @@ public interface Consumer<K, V> extends Closeable {
      */
     void unsubscribe();
 
-    /**
-     * @see KafkaConsumer#poll(long)
-     */
-    @Deprecated
-    ConsumerRecords<K, V> poll(long timeout);
-
     /**
      * @see KafkaConsumer#poll(Duration)
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6710212c566..1b12ae3d4b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -789,45 +789,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
         delegate.assign(partitions);
     }
 
-    /**
-     * Fetch data for the topics or partitions specified using one of the 
subscribe/assign APIs. It is an error to not have
-     * subscribed to any topics or partitions before polling for data.
-     * <p>
-     * On each poll, consumer will try to use the last consumed offset as the 
starting offset and fetch sequentially. The last
-     * consumed offset can be manually set through {@link 
#seek(TopicPartition, long)} or automatically set as the last committed
-     * offset for the subscribed list of partitions
-     *
-     *
-     * @param timeoutMs The time, in milliseconds, spent waiting in poll if 
data is not available in the buffer.
-     *            If 0, returns immediately with any records that are 
available currently in the buffer, else returns empty.
-     *            Must not be negative.
-     * @return map of topic to records since the last fetch for the subscribed 
list of topics and partitions
-     *
-     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the 
offset for a partition or set of
-     *             partitions is undefined or out of range and no offset reset 
policy has been configured
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
-     *             function is called
-     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
-     *             this function is called
-     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
-     * @throws org.apache.kafka.common.errors.AuthorizationException if caller 
lacks Read access to any of the subscribed
-     *             topics or to the configured groupId. See the exception for 
more details
-     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. invalid groupId or
-     *             session timeout, errors deserializing key/value pairs, or 
any new error cases in future versions)
-     * @throws java.lang.IllegalArgumentException if the timeout value is 
negative
-     * @throws java.lang.IllegalStateException if the consumer is not 
subscribed to any topics or manually assigned any
-     *             partitions to consume from
-     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if 
this consumer instance gets fenced by broker.
-     *
-     * @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not 
block beyond the timeout awaiting partition
-     *             assignment. See <a 
href="https://cwiki.apache.org/confluence/x/5kiHB";>KIP-266</a> for more 
information.
-     */
-    @Deprecated
-    @Override
-    public ConsumerRecords<K, V> poll(final long timeoutMs) {
-        return delegate.poll(timeoutMs);
-    }
-
     /**
      * Fetch data for the topics or partitions specified using one of the 
subscribe/assign APIs. It is an error to not have
      * subscribed to any topics or partitions before polling for data.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 8acdfdca4bf..43b9f06336d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -203,12 +203,6 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         subscriptions.unsubscribe();
     }
 
-    @Deprecated
-    @Override
-    public synchronized ConsumerRecords<K, V> poll(long timeout) {
-        return poll(Duration.ofMillis(timeout));
-    }
-
     @Override
     public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
         ensureNotClosed();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 01a4d29471e..cd9b2ee37b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1507,13 +1507,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         );
     }
 
-    @Override
-    @Deprecated
-    public ConsumerRecords<K, V> poll(final long timeoutMs) {
-        throw new UnsupportedOperationException("Consumer.poll(long) is not 
supported when \"group.protocol\" is \"consumer\". " +
-             "This method is deprecated and will be removed in the next major 
release.");
-    }
-
     // Visible for testing
     WakeupTrigger wakeupTrigger() {
         return wakeupTrigger;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 65cff05bce3..52eee98d086 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -597,21 +597,15 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    @Deprecated
-    @Override
-    public ConsumerRecords<K, V> poll(final long timeoutMs) {
-        return poll(time.timer(timeoutMs), false);
-    }
-
     @Override
     public ConsumerRecords<K, V> poll(final Duration timeout) {
-        return poll(time.timer(timeout), true);
+        return poll(time.timer(timeout));
     }
 
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    private ConsumerRecords<K, V> poll(final Timer timer, final boolean 
includeMetadataInTimeout) {
+    private ConsumerRecords<K, V> poll(final Timer timer) {
         acquireAndEnsureOpen();
         try {
             this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
@@ -623,14 +617,8 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             do {
                 client.maybeTriggerWakeup();
 
-                if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to 
block on the timer for join group
-                    updateAssignmentMetadataIfNeeded(timer, false);
-                } else {
-                    while 
(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
-                        log.warn("Still waiting for metadata");
-                    }
-                }
+                // try to update assignment metadata BUT do not need to block 
on the timer for join group
+                updateAssignmentMetadataIfNeeded(timer, false);
 
                 final Fetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 63d949fcdbf..37cfc472f3e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -67,7 +67,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
@@ -780,31 +779,6 @@ public class KafkaConsumerTest {
         assertEquals(0, requests.stream().filter(request -> 
request.apiKey().equals(ApiKeys.FETCH)).count());
     }
 
-    // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
-    //       Once it is implemented, this should use both group protocols.
-    @ParameterizedTest
-    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
-    @SuppressWarnings("deprecation")
-    public void 
verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate(GroupProtocol 
groupProtocol) {
-        final ConsumerMetadata metadata = createMetadata(subscription);
-        final MockClient client = new MockClient(time, metadata);
-
-        initMetadata(client, Collections.singletonMap(topic, 1));
-        Node node = metadata.fetch().nodes().get(0);
-
-        consumer = newConsumer(groupProtocol, time, client, subscription, 
metadata, assignor, true, groupInstanceId);
-        consumer.subscribe(singleton(topic), 
getConsumerRebalanceListener(consumer));
-        prepareRebalance(client, node, assignor, singletonList(tp0), null);
-
-        consumer.poll(0L);
-
-        // The underlying client SHOULD get a fetch request
-        final Queue<ClientRequest> requests = client.requests();
-        assertEquals(1, requests.size());
-        final Class<? extends AbstractRequest.Builder> aClass = 
requests.peek().requestBuilder().getClass();
-        assertEquals(FetchRequest.Builder.class, aClass);
-    }
-
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     @SuppressWarnings("unchecked")
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index a9b0c2843d9..e824a03e32c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -68,37 +68,6 @@ public class MockConsumerTest {
         assertEquals(2L, 
consumer.committed(Collections.singleton(tp)).get(tp).offset());
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void testSimpleMockDeprecated() {
-        consumer.subscribe(Collections.singleton("test"));
-        assertEquals(0, consumer.poll(1000).count());
-        consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new 
TopicPartition("test", 1)));
-        // Mock consumers need to seek manually since they cannot 
automatically reset offsets
-        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
-        beginningOffsets.put(new TopicPartition("test", 0), 0L);
-        beginningOffsets.put(new TopicPartition("test", 1), 0L);
-        consumer.updateBeginningOffsets(beginningOffsets);
-        consumer.seek(new TopicPartition("test", 0), 0);
-        ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 
0, 0L, TimestampType.CREATE_TIME,
-            0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0, 
1, 0L, TimestampType.CREATE_TIME,
-            0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
-        consumer.addRecord(rec1);
-        consumer.addRecord(rec2);
-        ConsumerRecords<String, String> recs = consumer.poll(1);
-        Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
-        assertEquals(rec1, iter.next());
-        assertEquals(rec2, iter.next());
-        assertFalse(iter.hasNext());
-        final TopicPartition tp = new TopicPartition("test", 0);
-        assertEquals(2L, consumer.position(tp));
-        consumer.commitSync();
-        assertEquals(2L, 
consumer.committed(Collections.singleton(tp)).get(tp).offset());
-        assertEquals(new ConsumerGroupMetadata("dummy.group.id", 1, "1", 
Optional.empty()),
-            consumer.groupMetadata());
-    }
-
     @Test
     public void testConsumerRecordsIsEmptyWhenReturningNoRecords() {
         TopicPartition partition = new TopicPartition("test", 0);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index cec65754ed9..98a69c7f806 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -580,15 +580,6 @@ public class AsyncKafkaConsumerTest {
         assertThrows(callbackException.getClass(), () -> 
consumer.commitSync());
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testPollLongThrowsException() {
-        consumer = newConsumer();
-        Exception e = assertThrows(UnsupportedOperationException.class, () -> 
consumer.poll(0L));
-        assertEquals("Consumer.poll(long) is not supported when 
\"group.protocol\" is \"consumer\". " +
-            "This method is deprecated and will be removed in the next major 
release.", e.getMessage());
-    }
-
     @Test
     public void testCommitSyncLeaderEpochUpdate() {
         consumer = newConsumer();
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index af886d6a51d..7678c993547 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.lang.{Byte => JByte}
 import java.time.Duration
 import java.util
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, Semaphore}
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
 import kafka.utils.TestUtils
@@ -66,7 +66,6 @@ import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.metadata.LeaderAndIsr
 import org.junit.jupiter.api.function.Executable
 
-import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -1123,10 +1122,10 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     consumeRecords(consumer)
   }
 
-  @nowarn("cat=deprecation")
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testPatternSubscriptionWithNoTopicAccess(quorum: String): Unit = {
+    val assignSemaphore = new Semaphore(0)
     createTopicWithBrokerPrincipal(topic)
 
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, WRITE, ALLOW)), topicResource)
@@ -1137,8 +1136,16 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), groupResource)
 
     val consumer = createConsumer()
-    consumer.subscribe(Pattern.compile(topicPattern))
-    consumer.poll(0)
+    consumer.subscribe(Pattern.compile(topicPattern), new 
ConsumerRebalanceListener {
+      def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
+        assignSemaphore.release()
+      }
+      def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): 
Unit = {
+      }})
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(Duration.ofMillis(500))
+      assignSemaphore.tryAcquire()
+    }, "Assignment did not complete on time")
     assertTrue(consumer.subscription.isEmpty)
   }
 
@@ -1160,10 +1167,10 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
   }
 
-  @nowarn("cat=deprecation")
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testPatternSubscriptionWithTopicAndGroupRead(quorum: String): Unit = {
+    val assignSemaphore = new Semaphore(0)
     createTopicWithBrokerPrincipal(topic)
 
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, WRITE, ALLOW)), topicResource)
@@ -1187,13 +1194,20 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     // internal topics are not included, we should not be assigned any 
partitions from this topic
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)),  new ResourcePattern(TOPIC,
       GROUP_METADATA_TOPIC_NAME, LITERAL))
-    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
-    consumer.poll(0)
+    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new 
ConsumerRebalanceListener {
+      def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
+        assignSemaphore.release()
+      }
+      def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): 
Unit = {
+      }})
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(Duration.ofMillis(500))
+      assignSemaphore.tryAcquire()
+    }, "Assignment did not complete on time")
     assertTrue(consumer.subscription().isEmpty)
     assertTrue(consumer.assignment().isEmpty)
   }
 
-  @nowarn("cat=deprecation")
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
   def testPatternSubscriptionMatchingInternalTopic(quorum: String): Unit = {
@@ -1219,7 +1233,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       GROUP_METADATA_TOPIC_NAME, LITERAL))
     consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     TestUtils.retry(60000) {
-      consumer.poll(0)
+      consumer.poll(Duration.ofMillis(500))
       assertEquals(Set(GROUP_METADATA_TOPIC_NAME), 
consumer.subscription.asScala)
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 927fa13fc3e..48c5112aabe 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Disabled, Test}
 
 import java.time.Duration
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, mutable}
 
@@ -390,15 +389,13 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
     checkCloseDuringRebalance("group1", topic, executor, 
brokersAvailableDuringClose = true)
   }
 
-  @nowarn("cat=deprecation")
   private def checkCloseDuringRebalance(groupId: String, topic: String, 
executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {
 
     def subscribeAndPoll(consumer: Consumer[Array[Byte], Array[Byte]], 
revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
       executor.submit(() => {
         consumer.subscribe(Collections.singletonList(topic))
         revokeSemaphore.foreach(s => s.release())
-        // requires to used deprecated `poll(long)` to trigger metadata update
-          consumer.poll(0L)
+          consumer.poll(Duration.ofMillis(500))
         }, 0)
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
index 83a325fcc53..15b0577c184 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
@@ -33,17 +33,6 @@ import scala.jdk.CollectionConverters._
 @Timeout(600)
 class PlaintextConsumerPollTest extends AbstractConsumerTest {
 
-  // Deprecated poll(timeout) not supported for consumer group protocol
-  @deprecated("poll(Duration) is the replacement", since = "2.0")
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
-  def testDeprecatedPollBlocksForAssignment(quorum: String, groupProtocol: 
String): Unit = {
-    val consumer = createConsumer()
-    consumer.subscribe(Set(topic).asJava)
-    consumer.poll(0)
-    assertEquals(Set(tp, tp2), consumer.assignment().asScala)
-  }
-
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testMaxPollRecords(quorum: String, groupProtocol: String): Unit = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 543bdb2e5f2..cab55f96f1a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -100,6 +100,10 @@
                 </li>
                 <li><b>Consumer</b>
                     <ul>
+                        <li>The <code>poll(long)</code> method was removed 
from the consumer. Please use <code>poll(Duration)</code> instead. Note that 
there is
+                            a difference in behavior between the two methods. 
The <code>poll(Duration)</code> method does not block beyond the timeout 
awaiting
+                            partition assignment, whereas the earlier 
<code>poll(long)</code> method used to wait beyond the timeout.
+                        </li>
                         <li>The <code>committed(TopicPartition)</code> and 
<code>committed(TopicPartition, Duration)</code> methods were removed from the 
consumer.
                             Please use 
<code>committed(Set&ltTopicPartition&gt)</code> and 
<code>committed(Set&ltTopicPartition&gt, Duration)</code> instead.
                         </li>

Reply via email to