This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7db4d53f189 KAFKA-12690 Remove deprecated 
Producer#sendOffsetsToTransaction (#17865)
7db4d53f189 is described below

commit 7db4d53f1890ba7349c6f2bd26c449173886fed9
Author: Nick Guo <[email protected]>
AuthorDate: Fri Nov 22 18:07:10 2024 +0800

    KAFKA-12690 Remove deprecated Producer#sendOffsetsToTransaction (#17865)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      | 40 ----------------------
 .../kafka/clients/producer/MockProducer.java       |  8 -----
 .../apache/kafka/clients/producer/Producer.java    |  7 ----
 .../kafka/clients/producer/MockProducerTest.java   | 37 --------------------
 .../kafka/api/TransactionsBounceTest.scala         |  9 -----
 .../integration/kafka/api/TransactionsTest.scala   |  9 -----
 6 files changed, 110 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 65d6a1e9903..a77b3f8809f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -689,46 +689,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         producerMetrics.recordBeginTxn(time.nanoseconds() - now);
     }
 
-    /**
-     * Sends a list of specified offsets to the consumer group coordinator, 
and also marks
-     * those offsets as part of the current transaction. These offsets will be 
considered
-     * committed only if the transaction is committed successfully. The 
committed offset should
-     * be the next message your application will consume, i.e. 
lastProcessedMessageOffset + 1.
-     * <p>
-     * This method should be used when you need to batch consumed and produced 
messages
-     * together, typically in a consume-transform-produce pattern. Thus, the 
specified
-     * {@code consumerGroupId} should be the same as config parameter {@code 
group.id} of the used
-     * {@link KafkaConsumer consumer}. Note, that the consumer should have 
{@code enable.auto.commit=false}
-     * and should also not commit offsets manually (via {@link 
KafkaConsumer#commitSync(Map) sync} or
-     * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} 
commits).
-     *
-     * <p>
-     * This method is a blocking call that waits until the request has been 
received and acknowledged by the consumer group
-     * coordinator; but the offsets are not considered as committed until the 
transaction itself is successfully committed later (via
-     * the {@link #commitTransaction()} call).
-     *
-     * @throws IllegalStateException if no transactional.id has been 
configured, no transaction has been started
-     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
-     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
-     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
-     * @throws 
org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error 
indicating the message
-     *         format used for the offsets topic on the broker does not 
support transactions
-     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
-     *         transactional.id is not authorized, or the consumer group id is 
not authorized.
-     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if 
the producer has attempted to produce with an old epoch
-     *         to the partition leader. See the exception for more details
-     * @throws TimeoutException if the time taken for sending the offsets has 
surpassed <code>max.block.ms</code>.
-     * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
-     *         other unexpected error
-     *
-     * @deprecated Since 3.0.0, please use {@link 
#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} instead.
-     */
-    @Deprecated
-    public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                         String consumerGroupId) throws 
ProducerFencedException {
-        sendOffsetsToTransaction(offsets, new 
ConsumerGroupMetadata(consumerGroupId));
-    }
-
     /**
      * Sends a list of specified offsets to the consumer group coordinator, 
and also marks
      * those offsets as part of the current transaction. These offsets will be 
considered
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3d278c40cb0..ac7b0a191a7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -202,14 +202,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.sentOffsets = false;
     }
 
-    @Deprecated
-    @Override
-    public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                         String consumerGroupId) throws 
ProducerFencedException {
-        Objects.requireNonNull(consumerGroupId);
-        sendOffsetsToTransaction(offsets, new 
ConsumerGroupMetadata(consumerGroupId));
-    }
-
     @Override
     public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets,
                                          ConsumerGroupMetadata groupMetadata) 
throws ProducerFencedException {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 87e9d6042ee..798034dda6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -49,13 +49,6 @@ public interface Producer<K, V> extends Closeable {
      */
     void beginTransaction() throws ProducerFencedException;
 
-    /**
-     * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}
-     */
-    @Deprecated
-    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> 
offsets,
-                                  String consumerGroupId) throws 
ProducerFencedException;
-
     /**
      * See {@link KafkaProducer#sendOffsetsToTransaction(Map, 
ConsumerGroupMetadata)}
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index d27a297d30a..0045f271c1a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -419,15 +419,6 @@ public class MockProducerTest {
         assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
     }
 
-    @Deprecated
-    @Test
-    public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() 
{
-        buildMockProducer(true);
-        producer.initTransactions();
-        producer.beginTransaction();
-        assertThrows(NullPointerException.class, () -> 
producer.sendOffsetsToTransaction(Collections.emptyMap(), (String) null));
-    }
-
     @Test
     public void 
shouldThrowOnNullConsumerGroupMetadataWhenSendOffsetsToTransaction() {
         buildMockProducer(true);
@@ -436,16 +427,6 @@ public class MockProducerTest {
         assertThrows(NullPointerException.class, () -> 
producer.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata(null)));
     }
 
-    @Deprecated
-    @Test
-    public void 
shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupId() {
-        buildMockProducer(true);
-        producer.initTransactions();
-        producer.beginTransaction();
-        producer.sendOffsetsToTransaction(Collections.emptyMap(), "groupId");
-        assertFalse(producer.sentOffsets());
-    }
-
     @Test
     public void 
shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
         buildMockProducer(true);
@@ -455,24 +436,6 @@ public class MockProducerTest {
         assertFalse(producer.sentOffsets());
     }
 
-    @Deprecated
-    @Test
-    public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupId() {
-        buildMockProducer(true);
-        producer.initTransactions();
-        producer.beginTransaction();
-
-        assertFalse(producer.sentOffsets());
-
-        Map<TopicPartition, OffsetAndMetadata> groupCommit = new 
HashMap<TopicPartition, OffsetAndMetadata>() {
-            {
-                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, 
null));
-            }
-        };
-        producer.sendOffsetsToTransaction(groupCommit, "groupId");
-        assertTrue(producer.sentOffsets());
-    }
-
     @Test
     public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
         buildMockProducer(true);
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 3c58eb2e596..1d1eca60ead 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
@@ -76,14 +75,6 @@ class TransactionsBounceTest extends IntegrationTestHarness {
 
   override protected def brokerCount: Int = 4
 
-  @nowarn("cat=deprecation")
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
-  def testWithGroupId(quorum: String, groupProtocol: String): Unit = {
-    testBrokerFailure((producer, groupId, consumer) =>
-      
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
groupId))
-  }
-
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testWithGroupMetadata(quorum: String, groupProtocol: String): Unit = {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 3a30dfe00e3..3292f3cd035 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -39,7 +39,6 @@ import java.time.Duration
 import java.util
 import java.util.concurrent.TimeUnit
 import java.util.{Optional, Properties}
-import scala.annotation.nowarn
 import scala.collection.{Seq, mutable}
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent.ExecutionException
@@ -301,14 +300,6 @@ class TransactionsTest extends IntegrationTestHarness {
     assertEquals(3L, second.offset)
   }
 
-  @nowarn("cat=deprecation")
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
-  def testSendOffsetsWithGroupId(quorum: String, groupProtocol: String): Unit 
= {
-    sendOffset((producer, groupId, consumer) =>
-      
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, 
groupId))
-  }
-
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSendOffsetsWithGroupMetadata(quorum: String, groupProtocol: String): 
Unit = {

Reply via email to