This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fcf6da0a0da KAFKA-19098 Remove `lastOffset` from PartitionResponse (#19398) fcf6da0a0da is described below commit fcf6da0a0dae34c79ec7d73acf4a2ee52c3e64bb Author: Nick Guo <lansg0...@gmail.com> AuthorDate: Tue Apr 8 00:06:02 2025 +0800 KAFKA-19098 Remove `lastOffset` from PartitionResponse (#19398) The `lastOffset` is not used actually, so it can be removed. Reviewers: Jhen-Yung Hsu <jhenyung...@gmail.com>, Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../apache/kafka/clients/producer/internals/Sender.java | 3 --- .../org/apache/kafka/common/requests/ProduceResponse.java | 14 ++------------ core/src/main/scala/kafka/server/ReplicaManager.scala | 1 - 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9190281a660..fe4a97b1229 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -71,8 +71,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; - /** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. @@ -608,7 +606,6 @@ public class Sender implements Runnable { ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( Errors.forCode(p.errorCode()), p.baseOffset(), - INVALID_OFFSET, p.logAppendTimeMs(), p.logStartOffset(), p.recordErrors() diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 99f7f475ba5..9354f059a19 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -159,7 +159,6 @@ public class ProduceResponse extends AbstractResponse { public static final class PartitionResponse { public Errors error; public long baseOffset; - public long lastOffset; public long logAppendTime; public long logStartOffset; public List<RecordError> recordErrors; @@ -183,17 +182,12 @@ public class ProduceResponse extends AbstractResponse { } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) { - this(error, baseOffset, INVALID_OFFSET, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); - } - - public PartitionResponse(Errors error, long baseOffset, long lastOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) { - this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); + this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); } public PartitionResponse( Errors error, long baseOffset, - long lastOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, @@ -202,7 +196,6 @@ public class ProduceResponse extends AbstractResponse { ) { this.error = error; this.baseOffset = baseOffset; - this.lastOffset = lastOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordErrors = recordErrors; @@ -216,7 +209,6 @@ public class ProduceResponse extends AbstractResponse { if (o == null || getClass() != o.getClass()) return false; PartitionResponse that = (PartitionResponse) o; return baseOffset == that.baseOffset && - lastOffset == that.lastOffset && logAppendTime == that.logAppendTime && logStartOffset == that.logStartOffset && error == that.error && @@ -227,7 +219,7 @@ public class ProduceResponse extends AbstractResponse { @Override public int hashCode() { - return Objects.hash(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader); + return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader); } @Override @@ -238,8 +230,6 @@ public class ProduceResponse extends AbstractResponse { b.append(error); b.append(",offset: "); b.append(baseOffset); - b.append(",lastOffset: "); - b.append(lastOffset); b.append(",logAppendTime: "); b.append(logAppendTime); b.append(", logStartOffset: "); diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index aa08fc93fff..2dbfabd86d9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -890,7 +890,6 @@ class ReplicaManager(val config: KafkaConfig, new PartitionResponse( result.error, result.info.firstOffset, - result.info.lastOffset, result.info.logAppendTime, result.info.logStartOffset, result.info.recordErrors,