This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe85eb51b39 MINOR: Clean up ProducerStateEntry (#21096)
fe85eb51b39 is described below

commit fe85eb51b39a0c11689a36d3901d5ed53665586e
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Dec 10 08:52:14 2025 +0500

    MINOR: Clean up ProducerStateEntry (#21096)
    
    A small cleanup in ProducerStateEntry: fixed access modifiers for
    methods.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../storage/internals/log/ProducerAppendInfo.java  |  2 +-
 .../storage/internals/log/ProducerStateEntry.java  | 52 +++++++++++-----------
 2 files changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 1f182444a87..26444c07793 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -81,7 +81,7 @@ public class ProducerAppendInfo {
         this.origin = origin;
         this.verificationStateEntry = verificationStateEntry;
 
-        updatedEntry = currentEntry.withProducerIdAndBatchMetadata(producerId, 
Optional.empty());
+        updatedEntry = currentEntry.withProducerId(producerId);
     }
 
     public long producerId() {
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
index b813aa89158..97eb59e0b0f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.Optional;
 import java.util.OptionalLong;
-import java.util.stream.Stream;
 
 /**
  * This class represents the state of a specific producer-id.
@@ -42,24 +41,28 @@ public class ProducerStateEntry {
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
 
-    public static ProducerStateEntry empty(long producerId) {
+    static ProducerStateEntry empty(long producerId) {
         return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty());
     }
 
-    public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset, 
Optional<BatchMetadata> firstBatchMetadata) {
+    public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp,
+                              OptionalLong currentTxnFirstOffset, 
Optional<BatchMetadata> firstBatchMetadata) {
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
         this.coordinatorEpoch = coordinatorEpoch;
         this.lastTimestamp = lastTimestamp;
         this.currentTxnFirstOffset = currentTxnFirstOffset;
-        firstBatchMetadata.ifPresent(batchMetadata::add);
+        firstBatchMetadata.ifPresent(batch -> {
+            batchMetadata.add(batch);
+            this.lastTimestamp = batch.timestamp();
+        });
     }
 
-    public int firstSeq() {
+    int firstSeq() {
         return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.getFirst().firstSeq();
     }
 
-    public int lastSeq() {
+    int lastSeq() {
         return isEmpty() ? RecordBatch.NO_SEQUENCE : 
batchMetadata.getLast().lastSeq();
     }
 
@@ -71,36 +74,31 @@ public class ProducerStateEntry {
         return isEmpty() ? -1L : batchMetadata.getLast().lastOffset();
     }
 
-    public int lastOffsetDelta() {
+    int lastOffsetDelta() {
         return isEmpty() ? 0 : batchMetadata.getLast().offsetDelta();
     }
 
-    public boolean isEmpty() {
+    boolean isEmpty() {
         return batchMetadata.isEmpty();
     }
 
     /**
-     * Returns a new instance with the provided parameters (when present) and 
the values from the current instance
-     * otherwise.
+     * Returns a new instance with the provided producer ID and the values 
from the current instance.
      */
-    public ProducerStateEntry withProducerIdAndBatchMetadata(long producerId, 
Optional<BatchMetadata> batchMetadata) {
-        return new ProducerStateEntry(producerId, this.producerEpoch(), 
this.coordinatorEpoch, this.lastTimestamp,
-            this.currentTxnFirstOffset, batchMetadata);
+    ProducerStateEntry withProducerId(long producerId) {
+        return new ProducerStateEntry(producerId, producerEpoch(), 
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset, Optional.empty());
     }
 
-    public void addBatch(short producerEpoch, int lastSeq, long lastOffset, 
int offsetDelta, long timestamp) {
+    void addBatch(short producerEpoch, int lastSeq, long lastOffset, int 
offsetDelta, long timestamp) {
         maybeUpdateProducerEpoch(producerEpoch);
         addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, 
timestamp));
         this.lastTimestamp = timestamp;
     }
 
-    public boolean maybeUpdateProducerEpoch(short producerEpoch) {
+    private void maybeUpdateProducerEpoch(short producerEpoch) {
         if (this.producerEpoch != producerEpoch) {
             batchMetadata.clear();
             this.producerEpoch = producerEpoch;
-            return true;
-        } else {
-            return false;
         }
     }
 
@@ -109,11 +107,11 @@ public class ProducerStateEntry {
         batchMetadata.add(batch);
     }
 
-    public void update(ProducerStateEntry nextEntry) {
+    void update(ProducerStateEntry nextEntry) {
         update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, 
nextEntry.lastTimestamp, nextEntry.batchMetadata, 
nextEntry.currentTxnFirstOffset);
     }
 
-    public void update(short producerEpoch, int coordinatorEpoch, long 
lastTimestamp) {
+    void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp) 
{
         update(producerEpoch, coordinatorEpoch, lastTimestamp, new 
ArrayDeque<>(0), OptionalLong.empty());
     }
 
@@ -127,19 +125,19 @@ public class ProducerStateEntry {
         this.lastTimestamp = lastTimestamp;
     }
 
-    public void setCurrentTxnFirstOffset(long firstOffset) {
+    void setCurrentTxnFirstOffset(long firstOffset) {
         this.currentTxnFirstOffset = OptionalLong.of(firstOffset);
     }
 
-    public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
-        if (batch.producerEpoch() != producerEpoch) return Optional.empty();
-        else return batchWithSequenceRange(batch.baseSequence(), 
batch.lastSequence());
+    Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
+        return batch.producerEpoch() != producerEpoch ? Optional.empty() : 
batchWithSequenceRange(batch.baseSequence(), batch.lastSequence());
     }
 
     // Return the batch metadata of the cached batch having the exact sequence 
range, if any.
-    Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) {
-        Stream<BatchMetadata> duplicate = 
batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && 
lastSeq == metadata.lastSeq());
-        return duplicate.findFirst();
+    private Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int 
lastSeq) {
+        return batchMetadata.stream()
+            .filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == 
metadata.lastSeq())
+            .findFirst();
     }
 
     public Collection<BatchMetadata> batchMetadata() {

Reply via email to