This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push: new af15003 MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450) af15003 is described below commit af150033b27c7822bfc995abafd6f91597e2e4ca Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Tue Oct 20 10:15:01 2020 +0100 MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450) Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/clients/producer/internals/Sender.java | 4 +-- .../clients/producer/internals/SenderTest.java | 36 +++++++++++++++++++--- .../kafka/api/PlaintextProducerSendTest.scala | 18 ++++++----- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1300e5c..958f17f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -616,7 +616,7 @@ public class Sender implements Runnable { else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); else - exception = error.exception(); + exception = error.exception(response.errorMessage); // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust // its retries -- if it did, we don't know whether the sequence number was accepted or not, and // thus it is not safe to reassign the sequence. @@ -629,7 +629,7 @@ public class Sender implements Runnable { batch.topicPartition); } else { log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + - "to request metadata update now", batch.topicPartition, error.exception().toString()); + "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString()); } metadata.requestUpdate(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 63ab680..85d8db8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.TransactionAbortedException; import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.clients.producer.RecordMetadata; @@ -93,6 +94,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -1726,7 +1728,7 @@ public class SenderTest { assertFalse(batchIterator.hasNext()); assertEquals(expectedSequence, firstBatch.baseSequence()); return true; - }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset)); + }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset, null)); } @Test @@ -2129,7 +2131,7 @@ public class SenderTest { time.sleep(deliveryTimeoutMs / 2); // expire the first batch only - client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L)); + client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L, null)); sender.runOnce(); // receive response (offset=0) assertEquals(0, client.inFlightRequestCount()); assertEquals(0, sender.inFlightBatches(tp0).size()); @@ -2432,6 +2434,29 @@ public class SenderTest { sender.runOnce(); } + @Test + public void testDefaultErrorMessage() throws Exception { + verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message()); + } + + @Test + public void testCustomErrorMessage() throws Exception { + String errorMessage = "testCustomErrorMessage"; + verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage); + } + + private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception { + Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); // connect + sender.runOnce(); // send produce request + client.respond(response); + sender.runOnce(); + sender.runOnce(); + ExecutionException e1 = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)); + assertEquals(InvalidRequestException.class, e1.getCause().getClass()); + assertEquals(expectedMessage, e1.getCause().getMessage()); + } + class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher { private TransactionResult requiredResult; @@ -2527,8 +2552,9 @@ public class SenderTest { null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; } - private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) { + ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, + RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } @@ -2544,7 +2570,7 @@ public class SenderTest { } private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { - return produceResponse(tp, offset, error, throttleTimeMs, -1L); + return produceResponse(tp, offset, error, throttleTimeMs, -1L, null); } private TransactionManager createTransactionManager() { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 4e44f34..3b91c2c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -98,10 +98,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val producer = createProducer(brokerList = brokerList) try { - producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get() - fail("Should throw CorruptedRecordException") - } catch { - case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException]) + val e = intercept[ExecutionException] { + producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get() + }.getCause + assertTrue(e.isInstanceOf[InvalidTimestampException]) + assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage) } finally { producer.close() } @@ -109,10 +110,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // Test compressed messages. val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip") try { - compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get() - fail("Should throw CorruptedRecordException") - } catch { - case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException]) + val e = intercept[ExecutionException] { + compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get() + }.getCause + assertTrue(e.isInstanceOf[InvalidTimestampException]) + assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage) } finally { compressedProducer.close() }