This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 284a3db28aa KAFKA-15908: Remove deprecated Consumer.poll(long timeout)
(#17368)
284a3db28aa is described below
commit 284a3db28aa5f76844f1b1e696d36d88b161c090
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Oct 8 15:49:10 2024 +0100
KAFKA-15908: Remove deprecated Consumer.poll(long timeout) (#17368)
Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
<[email protected]>, Lianet Magrans <[email protected]>
---
.../apache/kafka/clients/consumer/Consumer.java | 6 ----
.../kafka/clients/consumer/KafkaConsumer.java | 39 ----------------------
.../kafka/clients/consumer/MockConsumer.java | 6 ----
.../consumer/internals/AsyncKafkaConsumer.java | 7 ----
.../consumer/internals/ClassicKafkaConsumer.java | 20 +++--------
.../kafka/clients/consumer/KafkaConsumerTest.java | 26 ---------------
.../kafka/clients/consumer/MockConsumerTest.java | 31 -----------------
.../consumer/internals/AsyncKafkaConsumerTest.java | 9 -----
.../kafka/api/AuthorizerIntegrationTest.scala | 34 +++++++++++++------
.../integration/kafka/api/ConsumerBounceTest.scala | 5 +--
.../kafka/api/PlaintextConsumerPollTest.scala | 11 ------
docs/upgrade.html | 4 +++
12 files changed, 33 insertions(+), 165 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 055fcfb1b4f..42013955783 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -78,12 +78,6 @@ public interface Consumer<K, V> extends Closeable {
*/
void unsubscribe();
- /**
- * @see KafkaConsumer#poll(long)
- */
- @Deprecated
- ConsumerRecords<K, V> poll(long timeout);
-
/**
* @see KafkaConsumer#poll(Duration)
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6710212c566..1b12ae3d4b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -789,45 +789,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
delegate.assign(partitions);
}
- /**
- * Fetch data for the topics or partitions specified using one of the
subscribe/assign APIs. It is an error to not have
- * subscribed to any topics or partitions before polling for data.
- * <p>
- * On each poll, consumer will try to use the last consumed offset as the
starting offset and fetch sequentially. The last
- * consumed offset can be manually set through {@link
#seek(TopicPartition, long)} or automatically set as the last committed
- * offset for the subscribed list of partitions
- *
- *
- * @param timeoutMs The time, in milliseconds, spent waiting in poll if
data is not available in the buffer.
- * If 0, returns immediately with any records that are
available currently in the buffer, else returns empty.
- * Must not be negative.
- * @return map of topic to records since the last fetch for the subscribed
list of topics and partitions
- *
- * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the
offset for a partition or set of
- * partitions is undefined or out of range and no offset reset
policy has been configured
- * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
- * function is called
- * @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
- * this function is called
- * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
- * @throws org.apache.kafka.common.errors.AuthorizationException if caller
lacks Read access to any of the subscribed
- * topics or to the configured groupId. See the exception for
more details
- * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors (e.g. invalid groupId or
- * session timeout, errors deserializing key/value pairs, or
any new error cases in future versions)
- * @throws java.lang.IllegalArgumentException if the timeout value is
negative
- * @throws java.lang.IllegalStateException if the consumer is not
subscribed to any topics or manually assigned any
- * partitions to consume from
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
- *
- * @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not
block beyond the timeout awaiting partition
- * assignment. See <a
href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> for more
information.
- */
- @Deprecated
- @Override
- public ConsumerRecords<K, V> poll(final long timeoutMs) {
- return delegate.poll(timeoutMs);
- }
-
/**
* Fetch data for the topics or partitions specified using one of the
subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 8acdfdca4bf..43b9f06336d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -203,12 +203,6 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.unsubscribe();
}
- @Deprecated
- @Override
- public synchronized ConsumerRecords<K, V> poll(long timeout) {
- return poll(Duration.ofMillis(timeout));
- }
-
@Override
public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
ensureNotClosed();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 01a4d29471e..cd9b2ee37b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1507,13 +1507,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
);
}
- @Override
- @Deprecated
- public ConsumerRecords<K, V> poll(final long timeoutMs) {
- throw new UnsupportedOperationException("Consumer.poll(long) is not
supported when \"group.protocol\" is \"consumer\". " +
- "This method is deprecated and will be removed in the next major
release.");
- }
-
// Visible for testing
WakeupTrigger wakeupTrigger() {
return wakeupTrigger;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 65cff05bce3..52eee98d086 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -597,21 +597,15 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- @Deprecated
- @Override
- public ConsumerRecords<K, V> poll(final long timeoutMs) {
- return poll(time.timer(timeoutMs), false);
- }
-
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
- return poll(time.timer(timeout), true);
+ return poll(time.timer(timeout));
}
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- private ConsumerRecords<K, V> poll(final Timer timer, final boolean
includeMetadataInTimeout) {
+ private ConsumerRecords<K, V> poll(final Timer timer) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
@@ -623,14 +617,8 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
do {
client.maybeTriggerWakeup();
- if (includeMetadataInTimeout) {
- // try to update assignment metadata BUT do not need to
block on the timer for join group
- updateAssignmentMetadataIfNeeded(timer, false);
- } else {
- while
(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
- log.warn("Still waiting for metadata");
- }
- }
+ // try to update assignment metadata BUT do not need to block
on the timer for join group
+ updateAssignmentMetadataIfNeeded(timer, false);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 63d949fcdbf..37cfc472f3e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -67,7 +67,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@@ -780,31 +779,6 @@ public class KafkaConsumerTest {
assertEquals(0, requests.stream().filter(request ->
request.apiKey().equals(ApiKeys.FETCH)).count());
}
- // TODO: this test requires rebalance logic which is not yet implemented
in the CONSUMER group protocol.
- // Once it is implemented, this should use both group protocols.
- @ParameterizedTest
- @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
- @SuppressWarnings("deprecation")
- public void
verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate(GroupProtocol
groupProtocol) {
- final ConsumerMetadata metadata = createMetadata(subscription);
- final MockClient client = new MockClient(time, metadata);
-
- initMetadata(client, Collections.singletonMap(topic, 1));
- Node node = metadata.fetch().nodes().get(0);
-
- consumer = newConsumer(groupProtocol, time, client, subscription,
metadata, assignor, true, groupInstanceId);
- consumer.subscribe(singleton(topic),
getConsumerRebalanceListener(consumer));
- prepareRebalance(client, node, assignor, singletonList(tp0), null);
-
- consumer.poll(0L);
-
- // The underlying client SHOULD get a fetch request
- final Queue<ClientRequest> requests = client.requests();
- assertEquals(1, requests.size());
- final Class<? extends AbstractRequest.Builder> aClass =
requests.peek().requestBuilder().getClass();
- assertEquals(FetchRequest.Builder.class, aClass);
- }
-
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index a9b0c2843d9..e824a03e32c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -68,37 +68,6 @@ public class MockConsumerTest {
assertEquals(2L,
consumer.committed(Collections.singleton(tp)).get(tp).offset());
}
- @SuppressWarnings("deprecation")
- @Test
- public void testSimpleMockDeprecated() {
- consumer.subscribe(Collections.singleton("test"));
- assertEquals(0, consumer.poll(1000).count());
- consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new
TopicPartition("test", 1)));
- // Mock consumers need to seek manually since they cannot
automatically reset offsets
- HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
- beginningOffsets.put(new TopicPartition("test", 0), 0L);
- beginningOffsets.put(new TopicPartition("test", 1), 0L);
- consumer.updateBeginningOffsets(beginningOffsets);
- consumer.seek(new TopicPartition("test", 0), 0);
- ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0,
0, 0L, TimestampType.CREATE_TIME,
- 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
- ConsumerRecord<String, String> rec2 = new ConsumerRecord<>("test", 0,
1, 0L, TimestampType.CREATE_TIME,
- 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty());
- consumer.addRecord(rec1);
- consumer.addRecord(rec2);
- ConsumerRecords<String, String> recs = consumer.poll(1);
- Iterator<ConsumerRecord<String, String>> iter = recs.iterator();
- assertEquals(rec1, iter.next());
- assertEquals(rec2, iter.next());
- assertFalse(iter.hasNext());
- final TopicPartition tp = new TopicPartition("test", 0);
- assertEquals(2L, consumer.position(tp));
- consumer.commitSync();
- assertEquals(2L,
consumer.committed(Collections.singleton(tp)).get(tp).offset());
- assertEquals(new ConsumerGroupMetadata("dummy.group.id", 1, "1",
Optional.empty()),
- consumer.groupMetadata());
- }
-
@Test
public void testConsumerRecordsIsEmptyWhenReturningNoRecords() {
TopicPartition partition = new TopicPartition("test", 0);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index cec65754ed9..98a69c7f806 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -580,15 +580,6 @@ public class AsyncKafkaConsumerTest {
assertThrows(callbackException.getClass(), () ->
consumer.commitSync());
}
- @Test
- @SuppressWarnings("deprecation")
- public void testPollLongThrowsException() {
- consumer = newConsumer();
- Exception e = assertThrows(UnsupportedOperationException.class, () ->
consumer.poll(0L));
- assertEquals("Consumer.poll(long) is not supported when
\"group.protocol\" is \"consumer\". " +
- "This method is deprecated and will be removed in the next major
release.", e.getMessage());
- }
-
@Test
public void testCommitSyncLeaderEpochUpdate() {
consumer = newConsumer();
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index af886d6a51d..7678c993547 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -15,7 +15,7 @@ package kafka.api
import java.lang.{Byte => JByte}
import java.time.Duration
import java.util
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, Semaphore}
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
import kafka.utils.TestUtils
@@ -66,7 +66,6 @@ import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.LeaderAndIsr
import org.junit.jupiter.api.function.Executable
-import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -1123,10 +1122,10 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
consumeRecords(consumer)
}
- @nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionWithNoTopicAccess(quorum: String): Unit = {
+ val assignSemaphore = new Semaphore(0)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, WRITE, ALLOW)), topicResource)
@@ -1137,8 +1136,16 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), groupResource)
val consumer = createConsumer()
- consumer.subscribe(Pattern.compile(topicPattern))
- consumer.poll(0)
+ consumer.subscribe(Pattern.compile(topicPattern), new
ConsumerRebalanceListener {
+ def onPartitionsAssigned(partitions: util.Collection[TopicPartition]):
Unit = {
+ assignSemaphore.release()
+ }
+ def onPartitionsRevoked(partitions: util.Collection[TopicPartition]):
Unit = {
+ }})
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(500))
+ assignSemaphore.tryAcquire()
+ }, "Assignment did not complete on time")
assertTrue(consumer.subscription.isEmpty)
}
@@ -1160,10 +1167,10 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
- @nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionWithTopicAndGroupRead(quorum: String): Unit = {
+ val assignSemaphore = new Semaphore(0)
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, WRITE, ALLOW)), topicResource)
@@ -1187,13 +1194,20 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
// internal topics are not included, we should not be assigned any
partitions from this topic
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), new ResourcePattern(TOPIC,
GROUP_METADATA_TOPIC_NAME, LITERAL))
- consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
- consumer.poll(0)
+ consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new
ConsumerRebalanceListener {
+ def onPartitionsAssigned(partitions: util.Collection[TopicPartition]):
Unit = {
+ assignSemaphore.release()
+ }
+ def onPartitionsRevoked(partitions: util.Collection[TopicPartition]):
Unit = {
+ }})
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(500))
+ assignSemaphore.tryAcquire()
+ }, "Assignment did not complete on time")
assertTrue(consumer.subscription().isEmpty)
assertTrue(consumer.assignment().isEmpty)
}
- @nowarn("cat=deprecation")
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testPatternSubscriptionMatchingInternalTopic(quorum: String): Unit = {
@@ -1219,7 +1233,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
GROUP_METADATA_TOPIC_NAME, LITERAL))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
TestUtils.retry(60000) {
- consumer.poll(0)
+ consumer.poll(Duration.ofMillis(500))
assertEquals(Set(GROUP_METADATA_TOPIC_NAME),
consumer.subscription.asScala)
}
}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 927fa13fc3e..48c5112aabe 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Disabled, Test}
import java.time.Duration
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
@@ -390,15 +389,13 @@ class ConsumerBounceTest extends AbstractConsumerTest
with Logging {
checkCloseDuringRebalance("group1", topic, executor,
brokersAvailableDuringClose = true)
}
- @nowarn("cat=deprecation")
private def checkCloseDuringRebalance(groupId: String, topic: String,
executor: ExecutorService, brokersAvailableDuringClose: Boolean): Unit = {
def subscribeAndPoll(consumer: Consumer[Array[Byte], Array[Byte]],
revokeSemaphore: Option[Semaphore] = None): Future[Any] = {
executor.submit(() => {
consumer.subscribe(Collections.singletonList(topic))
revokeSemaphore.foreach(s => s.release())
- // requires to used deprecated `poll(long)` to trigger metadata update
- consumer.poll(0L)
+ consumer.poll(Duration.ofMillis(500))
}, 0)
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
index 83a325fcc53..15b0577c184 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
@@ -33,17 +33,6 @@ import scala.jdk.CollectionConverters._
@Timeout(600)
class PlaintextConsumerPollTest extends AbstractConsumerTest {
- // Deprecated poll(timeout) not supported for consumer group protocol
- @deprecated("poll(Duration) is the replacement", since = "2.0")
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
- def testDeprecatedPollBlocksForAssignment(quorum: String, groupProtocol:
String): Unit = {
- val consumer = createConsumer()
- consumer.subscribe(Set(topic).asJava)
- consumer.poll(0)
- assertEquals(Set(tp, tp2), consumer.assignment().asScala)
- }
-
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMaxPollRecords(quorum: String, groupProtocol: String): Unit = {
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 543bdb2e5f2..cab55f96f1a 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -100,6 +100,10 @@
</li>
<li><b>Consumer</b>
<ul>
+ <li>The <code>poll(long)</code> method was removed
from the consumer. Please use <code>poll(Duration)</code> instead. Note that
there is
+ a difference in behavior between the two methods.
The <code>poll(Duration)</code> method does not block beyond the timeout
awaiting
+ partition assignment, whereas the earlier
<code>poll(long)</code> method used to wait beyond the timeout.
+ </li>
<li>The <code>committed(TopicPartition)</code> and
<code>committed(TopicPartition, Duration)</code> methods were removed from the
consumer.
Please use
<code>committed(Set<TopicPartition>)</code> and
<code>committed(Set<TopicPartition>, Duration)</code> instead.
</li>