This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7db4d53f189 KAFKA-12690 Remove deprecated
Producer#sendOffsetsToTransaction (#17865)
7db4d53f189 is described below
commit 7db4d53f1890ba7349c6f2bd26c449173886fed9
Author: Nick Guo <[email protected]>
AuthorDate: Fri Nov 22 18:07:10 2024 +0800
KAFKA-12690 Remove deprecated Producer#sendOffsetsToTransaction (#17865)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 40 ----------------------
.../kafka/clients/producer/MockProducer.java | 8 -----
.../apache/kafka/clients/producer/Producer.java | 7 ----
.../kafka/clients/producer/MockProducerTest.java | 37 --------------------
.../kafka/api/TransactionsBounceTest.scala | 9 -----
.../integration/kafka/api/TransactionsTest.scala | 9 -----
6 files changed, 110 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 65d6a1e9903..a77b3f8809f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -689,46 +689,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
producerMetrics.recordBeginTxn(time.nanoseconds() - now);
}
- /**
- * Sends a list of specified offsets to the consumer group coordinator,
and also marks
- * those offsets as part of the current transaction. These offsets will be
considered
- * committed only if the transaction is committed successfully. The
committed offset should
- * be the next message your application will consume, i.e.
lastProcessedMessageOffset + 1.
- * <p>
- * This method should be used when you need to batch consumed and produced
messages
- * together, typically in a consume-transform-produce pattern. Thus, the
specified
- * {@code consumerGroupId} should be the same as config parameter {@code
group.id} of the used
- * {@link KafkaConsumer consumer}. Note, that the consumer should have
{@code enable.auto.commit=false}
- * and should also not commit offsets manually (via {@link
KafkaConsumer#commitSync(Map) sync} or
- * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async}
commits).
- *
- * <p>
- * This method is a blocking call that waits until the request has been
received and acknowledged by the consumer group
- * coordinator; but the offsets are not considered as committed until the
transaction itself is successfully committed later (via
- * the {@link #commitTransaction()} call).
- *
- * @throws IllegalStateException if no transactional.id has been
configured, no transaction has been started
- * @throws ProducerFencedException fatal error indicating another producer
with the same transactional.id is active
- * @throws org.apache.kafka.common.errors.UnsupportedVersionException
fatal error indicating the broker
- * does not support transactions (i.e. if its version is lower
than 0.11.0.0)
- * @throws
org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error
indicating the message
- * format used for the offsets topic on the broker does not
support transactions
- * @throws org.apache.kafka.common.errors.AuthorizationException fatal
error indicating that the configured
- * transactional.id is not authorized, or the consumer group id is
not authorized.
- * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if
the producer has attempted to produce with an old epoch
- * to the partition leader. See the exception for more details
- * @throws TimeoutException if the time taken for sending the offsets has
surpassed <code>max.block.ms</code>.
- * @throws KafkaException if the producer has encountered a previous fatal
or abortable error, or for any
- * other unexpected error
- *
- * @deprecated Since 3.0.0, please use {@link
#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} instead.
- */
- @Deprecated
- public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets,
- String consumerGroupId) throws
ProducerFencedException {
- sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(consumerGroupId));
- }
-
/**
* Sends a list of specified offsets to the consumer group coordinator,
and also marks
* those offsets as part of the current transaction. These offsets will be
considered
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3d278c40cb0..ac7b0a191a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -202,14 +202,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.sentOffsets = false;
}
- @Deprecated
- @Override
- public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets,
- String consumerGroupId) throws
ProducerFencedException {
- Objects.requireNonNull(consumerGroupId);
- sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(consumerGroupId));
- }
-
@Override
public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata)
throws ProducerFencedException {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 87e9d6042ee..798034dda6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -49,13 +49,6 @@ public interface Producer<K, V> extends Closeable {
*/
void beginTransaction() throws ProducerFencedException;
- /**
- * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}
- */
- @Deprecated
- void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>
offsets,
- String consumerGroupId) throws
ProducerFencedException;
-
/**
* See {@link KafkaProducer#sendOffsetsToTransaction(Map,
ConsumerGroupMetadata)}
*/
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index d27a297d30a..0045f271c1a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -419,15 +419,6 @@ public class MockProducerTest {
assertEquals(Collections.singletonList(expectedResult),
producer.consumerGroupOffsetsHistory());
}
- @Deprecated
- @Test
- public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction()
{
- buildMockProducer(true);
- producer.initTransactions();
- producer.beginTransaction();
- assertThrows(NullPointerException.class, () ->
producer.sendOffsetsToTransaction(Collections.emptyMap(), (String) null));
- }
-
@Test
public void
shouldThrowOnNullConsumerGroupMetadataWhenSendOffsetsToTransaction() {
buildMockProducer(true);
@@ -436,16 +427,6 @@ public class MockProducerTest {
assertThrows(NullPointerException.class, () ->
producer.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata(null)));
}
- @Deprecated
- @Test
- public void
shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupId() {
- buildMockProducer(true);
- producer.initTransactions();
- producer.beginTransaction();
- producer.sendOffsetsToTransaction(Collections.emptyMap(), "groupId");
- assertFalse(producer.sentOffsets());
- }
-
@Test
public void
shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
buildMockProducer(true);
@@ -455,24 +436,6 @@ public class MockProducerTest {
assertFalse(producer.sentOffsets());
}
- @Deprecated
- @Test
- public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupId() {
- buildMockProducer(true);
- producer.initTransactions();
- producer.beginTransaction();
-
- assertFalse(producer.sentOffsets());
-
- Map<TopicPartition, OffsetAndMetadata> groupCommit = new
HashMap<TopicPartition, OffsetAndMetadata>() {
- {
- put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L,
null));
- }
- };
- producer.sendOffsetsToTransaction(groupCommit, "groupId");
- assertTrue(producer.sentOffsets());
- }
-
@Test
public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
buildMockProducer(true);
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 3c58eb2e596..1d1eca60ead 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -76,14 +75,6 @@ class TransactionsBounceTest extends IntegrationTestHarness {
override protected def brokerCount: Int = 4
- @nowarn("cat=deprecation")
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
- def testWithGroupId(quorum: String, groupProtocol: String): Unit = {
- testBrokerFailure((producer, groupId, consumer) =>
-
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava,
groupId))
- }
-
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testWithGroupMetadata(quorum: String, groupProtocol: String): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 3a30dfe00e3..3292f3cd035 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -39,7 +39,6 @@ import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
-import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.ExecutionException
@@ -301,14 +300,6 @@ class TransactionsTest extends IntegrationTestHarness {
assertEquals(3L, second.offset)
}
- @nowarn("cat=deprecation")
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
- def testSendOffsetsWithGroupId(quorum: String, groupProtocol: String): Unit
= {
- sendOffset((producer, groupId, consumer) =>
-
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava,
groupId))
- }
-
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendOffsetsWithGroupMetadata(quorum: String, groupProtocol: String):
Unit = {