This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8716ba1 MINOR; Add producer id in exceptions thrown by
ProducerStateManager (#9827)
8716ba1 is described below
commit 8716ba1ff18a6d969c44d73bbfb756c2046a9802
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 8 09:46:29 2021 +0100
MINOR; Add producer id in exceptions thrown by ProducerStateManager (#9827)
Reviewers: Chia-Ping Tsai <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/log/ProducerStateManager.scala | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index b4fa267..00f908b 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -201,8 +201,8 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
- val message = s"Producer's epoch at offset $offset in $topicPartition is
$producerEpoch, which is " +
- s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
+ val message = s"Epoch of producer $producerId at offset $offset in
$topicPartition is $producerEpoch, " +
+ s"which is smaller than the last seen epoch
${updatedEntry.producerEpoch}"
if (origin == AppendOrigin.Replication) {
warn(message)
@@ -219,8 +219,9 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
if (producerEpoch != updatedEntry.producerEpoch) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
- throw new OutOfOrderSequenceException(s"Invalid sequence number for
new epoch at offset $offset in " +
- s"partition $topicPartition: $producerEpoch (request epoch),
$appendFirstSeq (seq. number)")
+ throw new OutOfOrderSequenceException(s"Invalid sequence number for
new epoch of producer $producerId " +
+ s"at offset $offset in partition $topicPartition: $producerEpoch
(request epoch), $appendFirstSeq (seq. number), " +
+ s"${updatedEntry.producerEpoch} (current producer epoch)")
}
}
} else {
@@ -234,7 +235,7 @@ private[log] class ProducerAppendInfo(val topicPartition:
TopicPartition,
// If there is no current producer epoch (possibly because all producer
records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence
number
if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH ||
inSequence(currentLastSeq, appendFirstSeq))) {
- throw new OutOfOrderSequenceException(s"Out of order sequence number
for producerId $producerId at " +
+ throw new OutOfOrderSequenceException(s"Out of order sequence number
for producer $producerId at " +
s"offset $offset in partition $topicPartition: $appendFirstSeq
(incoming seq. number), " +
s"$currentLastSeq (current end sequence number)")
}