This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 783a645 KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should
not block infinitively (#9081)
783a645 is described below
commit 783a6451f5f8c50dbe151caf5e76b74917690364
Author: Sasaki Toru <[email protected]>
AuthorDate: Thu Jul 30 07:38:27 2020 +0900
KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block
infinitively (#9081)
Modified KafkaProducer.sendOffsetsToTransaction() to be affected with
max.block.ms, and added timeout test for blocking methods
Reviewers: Boyang Chen <[email protected]>, Guozhang Wang
<[email protected]>, Xi Hu <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 6 ++-
.../kafka/clients/producer/ProducerConfig.java | 2 +-
.../integration/kafka/api/TransactionsTest.scala | 59 +++++++++++++++-------
3 files changed, 48 insertions(+), 19 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d86de2..13af2f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -662,6 +662,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Note, that the consumer should have {@code enable.auto.commit=false}
and should
* also not commit offsets manually (via {@link
KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async}
commits).
+ * This method will raise {@link TimeoutException} if the producer cannot
send offsets before expiration of {@code max.block.ms}.
+ * Additionally, it will raise {@link InterruptException} if interrupted.
*
* @throws IllegalStateException if no transactional.id has been
configured or no transaction has been started.
* @throws ProducerFencedException fatal error indicating another producer
with the same transactional.id is active
@@ -679,6 +681,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*
mis-configured consumer instance id within group metadata.
* @throws KafkaException if the producer has encountered a previous fatal
or abortable error, or for any
* other unexpected error
+ * @throws TimeoutException if the time taken for sending offsets has
surpassed max.block.ms.
+ * @throws InterruptException if the thread is interrupted while blocked
*/
public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata)
throws ProducerFencedException {
@@ -687,7 +691,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfProducerClosed();
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await();
+ result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 2e9fa24..64d9de3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -157,7 +157,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.block.ms</code> */
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls
how long the <code>KafkaProducer</code>'s <code>send()</code>,
<code>partitionsFor()</code>, "
- +
"<code>initTransactions()</code>, <code>commitTransaction()</code> "
+ +
"<code>initTransactions()</code>, <code>sendOffsetsToTransaction()</code>,
<code>commitTransaction()</code> "
+ "and
<code>abortTransaction()</code> methods will block. "
+ "For <code>send()</code>
this timeout bounds the total time waiting for both metadata fetch and buffer
allocation "
+ "(blocking in the
user-supplied serializers or partitioner is not counted against this timeout). "
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 13fafc4..e3825a1 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -406,6 +406,48 @@ class TransactionsTest extends KafkaServerTestHarness {
TestUtils.waitUntilTrue(() =>
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot
read committed offset")
}
+ @Test(expected = classOf[TimeoutException])
+ def testInitTransactionsTimeout(): Unit = {
+ testTimeout(false, producer => producer.initTransactions())
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testSendOffsetsToTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.sendOffsetsToTransaction(
+ Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava,
"test-group"))
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testCommitTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.commitTransaction())
+ }
+
+ @Test(expected = classOf[TimeoutException])
+ def testAbortTransactionTimeout(): Unit = {
+ testTimeout(true, producer => producer.abortTransaction())
+ }
+
+ def testTimeout(needInitAndSendMsg: Boolean,
+ timeoutProcess: KafkaProducer[Array[Byte], Array[Byte]] =>
Unit): Unit = {
+ val producer = createTransactionalProducer("transactionProducer",
maxBlockMs = 1000)
+
+ if (needInitAndSendMsg) {
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1,
"foo".getBytes, "bar".getBytes))
+ }
+
+ for (i <- servers.indices)
+ killBroker(i)
+
+ try {
+ timeoutProcess(producer)
+ fail("Should raise a TimeoutException")
+ } finally {
+ producer.close(Duration.ZERO)
+ }
+ }
+
@Test
def testFencingOnSend(): Unit = {
val producer1 = transactionalProducers(0)
@@ -586,23 +628,6 @@ class TransactionsTest extends KafkaServerTestHarness {
fail("Should have raised a KafkaException")
}
- @Test(expected = classOf[TimeoutException])
- def testCommitTransactionTimeout(): Unit = {
- val producer = createTransactionalProducer("transactionalProducer",
maxBlockMs = 1000)
- producer.initTransactions()
- producer.beginTransaction()
- producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1,
"foobar".getBytes))
-
- for (i <- 0 until servers.size)
- killBroker(i) // pretend all brokers not available
-
- try {
- producer.commitTransaction()
- } finally {
- producer.close(Duration.ZERO)
- }
- }
-
@Test
def testBumpTransactionalEpoch(): Unit = {
val producer = createTransactionalProducer("transactionalProducer",