This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe85eb51b39 MINOR: Clean up ProducerStateEntry (#21096)
fe85eb51b39 is described below
commit fe85eb51b39a0c11689a36d3901d5ed53665586e
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Dec 10 08:52:14 2025 +0500
MINOR: Clean up ProducerStateEntry (#21096)
A small cleanup in ProducerStateEntry: fixed access modifiers for
methods.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../storage/internals/log/ProducerAppendInfo.java | 2 +-
.../storage/internals/log/ProducerStateEntry.java | 52 +++++++++++-----------
2 files changed, 26 insertions(+), 28 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 1f182444a87..26444c07793 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -81,7 +81,7 @@ public class ProducerAppendInfo {
this.origin = origin;
this.verificationStateEntry = verificationStateEntry;
- updatedEntry = currentEntry.withProducerIdAndBatchMetadata(producerId,
Optional.empty());
+ updatedEntry = currentEntry.withProducerId(producerId);
}
public long producerId() {
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
index b813aa89158..97eb59e0b0f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Deque;
import java.util.Optional;
import java.util.OptionalLong;
-import java.util.stream.Stream;
/**
* This class represents the state of a specific producer-id.
@@ -42,24 +41,28 @@ public class ProducerStateEntry {
private long lastTimestamp;
private OptionalLong currentTxnFirstOffset;
- public static ProducerStateEntry empty(long producerId) {
+ static ProducerStateEntry empty(long producerId) {
return new ProducerStateEntry(producerId,
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP,
OptionalLong.empty(), Optional.empty());
}
- public ProducerStateEntry(long producerId, short producerEpoch, int
coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset,
Optional<BatchMetadata> firstBatchMetadata) {
+ public ProducerStateEntry(long producerId, short producerEpoch, int
coordinatorEpoch, long lastTimestamp,
+ OptionalLong currentTxnFirstOffset,
Optional<BatchMetadata> firstBatchMetadata) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.coordinatorEpoch = coordinatorEpoch;
this.lastTimestamp = lastTimestamp;
this.currentTxnFirstOffset = currentTxnFirstOffset;
- firstBatchMetadata.ifPresent(batchMetadata::add);
+ firstBatchMetadata.ifPresent(batch -> {
+ batchMetadata.add(batch);
+ this.lastTimestamp = batch.timestamp();
+ });
}
- public int firstSeq() {
+ int firstSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE :
batchMetadata.getFirst().firstSeq();
}
- public int lastSeq() {
+ int lastSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE :
batchMetadata.getLast().lastSeq();
}
@@ -71,36 +74,31 @@ public class ProducerStateEntry {
return isEmpty() ? -1L : batchMetadata.getLast().lastOffset();
}
- public int lastOffsetDelta() {
+ int lastOffsetDelta() {
return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta();
}
- public boolean isEmpty() {
+ boolean isEmpty() {
return batchMetadata.isEmpty();
}
/**
- * Returns a new instance with the provided parameters (when present) and
the values from the current instance
- * otherwise.
+ * Returns a new instance with the provided producer ID and the values
from the current instance.
*/
- public ProducerStateEntry withProducerIdAndBatchMetadata(long producerId,
Optional<BatchMetadata> batchMetadata) {
- return new ProducerStateEntry(producerId, this.producerEpoch(),
this.coordinatorEpoch, this.lastTimestamp,
- this.currentTxnFirstOffset, batchMetadata);
+ ProducerStateEntry withProducerId(long producerId) {
+ return new ProducerStateEntry(producerId, producerEpoch(),
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset, Optional.empty());
}
- public void addBatch(short producerEpoch, int lastSeq, long lastOffset,
int offsetDelta, long timestamp) {
+ void addBatch(short producerEpoch, int lastSeq, long lastOffset, int
offsetDelta, long timestamp) {
maybeUpdateProducerEpoch(producerEpoch);
addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta,
timestamp));
this.lastTimestamp = timestamp;
}
- public boolean maybeUpdateProducerEpoch(short producerEpoch) {
+ private void maybeUpdateProducerEpoch(short producerEpoch) {
if (this.producerEpoch != producerEpoch) {
batchMetadata.clear();
this.producerEpoch = producerEpoch;
- return true;
- } else {
- return false;
}
}
@@ -109,11 +107,11 @@ public class ProducerStateEntry {
batchMetadata.add(batch);
}
- public void update(ProducerStateEntry nextEntry) {
+ void update(ProducerStateEntry nextEntry) {
update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch,
nextEntry.lastTimestamp, nextEntry.batchMetadata,
nextEntry.currentTxnFirstOffset);
}
- public void update(short producerEpoch, int coordinatorEpoch, long
lastTimestamp) {
+ void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp)
{
update(producerEpoch, coordinatorEpoch, lastTimestamp, new
ArrayDeque<>(0), OptionalLong.empty());
}
@@ -127,19 +125,19 @@ public class ProducerStateEntry {
this.lastTimestamp = lastTimestamp;
}
- public void setCurrentTxnFirstOffset(long firstOffset) {
+ void setCurrentTxnFirstOffset(long firstOffset) {
this.currentTxnFirstOffset = OptionalLong.of(firstOffset);
}
- public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
- if (batch.producerEpoch() != producerEpoch) return Optional.empty();
- else return batchWithSequenceRange(batch.baseSequence(),
batch.lastSequence());
+ Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
+ return batch.producerEpoch() != producerEpoch ? Optional.empty() :
batchWithSequenceRange(batch.baseSequence(), batch.lastSequence());
}
// Return the batch metadata of the cached batch having the exact sequence
range, if any.
- Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) {
- Stream<BatchMetadata> duplicate =
batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() &&
lastSeq == metadata.lastSeq());
- return duplicate.findFirst();
+ private Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int
lastSeq) {
+ return batchMetadata.stream()
+ .filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq ==
metadata.lastSeq())
+ .findFirst();
}
public Collection<BatchMetadata> batchMetadata() {