This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8716ba1  MINOR; Add producer id in exceptions thrown by 
ProducerStateManager (#9827)
8716ba1 is described below

commit 8716ba1ff18a6d969c44d73bbfb756c2046a9802
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 8 09:46:29 2021 +0100

    MINOR; Add producer id in exceptions thrown by ProducerStateManager (#9827)
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Jason Gustafson 
<[email protected]>
---
 core/src/main/scala/kafka/log/ProducerStateManager.scala | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index b4fa267..00f908b 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -201,8 +201,8 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
 
   private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
     if (producerEpoch < updatedEntry.producerEpoch) {
-      val message = s"Producer's epoch at offset $offset in $topicPartition is 
$producerEpoch, which is " +
-        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
+      val message = s"Epoch of producer $producerId at offset $offset in 
$topicPartition is $producerEpoch, " +
+        s"which is smaller than the last seen epoch 
${updatedEntry.producerEpoch}"
 
       if (origin == AppendOrigin.Replication) {
         warn(message)
@@ -219,8 +219,9 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
     if (producerEpoch != updatedEntry.producerEpoch) {
       if (appendFirstSeq != 0) {
         if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
-          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch at offset $offset in " +
-            s"partition $topicPartition: $producerEpoch (request epoch), 
$appendFirstSeq (seq. number)")
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch of producer $producerId " +
+            s"at offset $offset in partition $topicPartition: $producerEpoch 
(request epoch), $appendFirstSeq (seq. number), " +
+            s"${updatedEntry.producerEpoch} (current producer epoch)")
         }
       }
     } else {
@@ -234,7 +235,7 @@ private[log] class ProducerAppendInfo(val topicPartition: 
TopicPartition,
       // If there is no current producer epoch (possibly because all producer 
records have been deleted due to
       // retention or the DeleteRecords API) accept writes with any sequence 
number
       if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
inSequence(currentLastSeq, appendFirstSeq))) {
-        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producerId $producerId at " +
+        throw new OutOfOrderSequenceException(s"Out of order sequence number 
for producer $producerId at " +
           s"offset $offset in partition $topicPartition: $appendFirstSeq 
(incoming seq. number), " +
           s"$currentLastSeq (current end sequence number)")
       }

Reply via email to