ijuma commented on code in PR #13085: URL: https://github.com/apache/kafka/pull/13085#discussion_r1064083858
########## clients/src/main/java/org/apache/kafka/common/utils/FetchRequestUtils.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +public class FetchRequestUtils { Review Comment: I would just add these methods to the existing `FetchRequest` class. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2312,26 +2313,26 @@ class ReplicaManagerTest { } private def fetchPartitions( - replicaManager: ReplicaManager, - replicaId: Int, - fetchInfos: Seq[(TopicIdPartition, PartitionData)], - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, - requestVersion: Short = ApiKeys.FETCH.latestVersion, - maxWaitMs: Long = 0, - minBytes: Int = 1, - maxBytes: Int = 1024 * 1024, - quota: ReplicaQuota = UnboundedQuota, - isolation: FetchIsolation = FetchLogEnd, - clientMetadata: Option[ClientMetadata] = None + replicaManager: ReplicaManager, + replicaId: Int, + fetchInfos: Seq[(TopicIdPartition, PartitionData)], + responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, + requestVersion: Short = ApiKeys.FETCH.latestVersion, + maxWaitMs: Long = 0, + minBytes: Int = 1, + maxBytes: Int = 1024 * 1024, + quota: ReplicaQuota = UnboundedQuota, + isolation: FetchIsolation = FetchIsolation.LOG_END, + clientMetadata: Option[ClientMetadata] = None Review Comment: Why this indent change? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo, this.info.records, this.divergingEpoch, this.lastStableOffset, - this.info.abortedTransactions, + if (this.info.abortedTransactions.isPresent) Some(this.info.abortedTransactions.get().asScala.toList) else None, Review Comment: The last `toList` results in a copy of the collection, we'd want to avoid that. ########## storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; + +import java.util.List; +import java.util.Optional; + +public class FetchDataInfo { + private final LogOffsetMetadata fetchOffsetMetadata; + private final Records records; + private final boolean firstEntryIncomplete; + private final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions; + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records) { + this(fetchOffsetMetadata, records, false, Optional.empty()); + } + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records, + boolean firstEntryIncomplete) { + this(fetchOffsetMetadata, records, firstEntryIncomplete, Optional.empty()); + } + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records, + Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) { + this(fetchOffsetMetadata, records, false, abortedTransactions); + } + + public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, + Records records, + boolean firstEntryIncomplete, + Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) { Review Comment: We have a lot of constructors, do we need these all? ########## core/src/main/scala/kafka/log/LocalLog.scala: ########## @@ -1005,12 +1005,12 @@ object LocalLog extends Logging { private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { - val abortedTransactions = - if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction]) - else None - FetchDataInfo(fetchOffsetMetadata, + val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] = + if (includeAbortedTxns) Optional.of(List.empty[FetchResponseData.AbortedTransaction].asJava) Review Comment: We can pass `Collections.emptyList` instead of creating a Scala list and then converting to Java. ########## storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.utils.FetchRequestUtils; + +import java.util.Objects; +import java.util.Optional; + +public class FetchParams { + private final short requestVersion; + private final int replicaId; + private final long maxWaitMs; + private final int minBytes; + private final int maxBytes; + private final FetchIsolation isolation; + private Optional<ClientMetadata> clientMetadata; Review Comment: How come this isn't final? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition, * @param minOneMessage whether to ensure that at least one complete message is returned * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches) * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present - * @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]] + * @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader` Review Comment: Are these changes related to this PR? ########## storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.FetchRequestUtils; + +public enum FetchIsolation { + LOG_END, + HIGH_WATERMARK, + TXN_COMMITTED; + + public static FetchIsolation apply(FetchRequest request) { Review Comment: I'd call this method and the other methods `of`. That's more common for Java. ########## storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.utils.FetchRequestUtils; + +import java.util.Objects; +import java.util.Optional; + +public class FetchParams { + private final short requestVersion; + private final int replicaId; + private final long maxWaitMs; + private final int minBytes; + private final int maxBytes; + private final FetchIsolation isolation; Review Comment: We can make these fields public since they're immutable. The accessors don't add much value in such cases. ########## storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.utils.FetchRequestUtils; + +import java.util.Objects; +import java.util.Optional; + +public class FetchParams { + private final short requestVersion; + private final int replicaId; + private final long maxWaitMs; + private final int minBytes; + private final int maxBytes; + private final FetchIsolation isolation; + private Optional<ClientMetadata> clientMetadata; + + public FetchParams(short requestVersion, + int replicaId, + long maxWaitMs, + int minBytes, + int maxBytes, + FetchIsolation isolation, + Optional<ClientMetadata> clientMetadata) { + this.requestVersion = requestVersion; + this.replicaId = replicaId; + this.maxWaitMs = maxWaitMs; + this.minBytes = minBytes; + this.maxBytes = maxBytes; + this.isolation = isolation; + this.clientMetadata = clientMetadata; + } + + public boolean isFromFollower() { + return FetchRequestUtils.isValidBrokerId(replicaId); + } + + public boolean isFromConsumer() { + return FetchRequestUtils.isConsumer(replicaId); + } + + public boolean fetchOnlyLeader() { + return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()); + } + + public boolean hardMaxBytesLimit() { + return requestVersion <= 2; + } + + public short requestVersion() { + return requestVersion; + } + + public int replicaId() { + return replicaId; + } + + public long maxWaitMs() { + return maxWaitMs; + } + + public int minBytes() { + return minBytes; + } + + public int maxBytes() { + return maxBytes; + } + + public FetchIsolation isolation() { + return isolation; + } + + public Optional<ClientMetadata> clientMetadata() { + return clientMetadata; + } + + @Override + public boolean equals(Object o) { Review Comment: Do we actually need to implement `equals` and `hashCode` for this? Is it used as the key of a map or something like that? I suspect it may not be needed and it was only there because it comes for free with case classes. Same question for other classes where we have implemented these methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org