This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ba03b211b4 KAFKA-20610: Added abstraction classes for share fetch 
reads (1/N) (#22389)
2ba03b211b4 is described below

commit 2ba03b211b4bb4ef289f5fbb0af9b16be8257d7d
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu May 28 08:36:45 2026 +0100

    KAFKA-20610: Added abstraction classes for share fetch reads (1/N) (#22389)
    
    The PR adds abstracted classes to move replica manager dependency from
    share fetch related classes.
    
    The change is also needed by Share Group DLQ manager to read data for
    specific offset.
    
    Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan
     <[email protected]>
---
 .../server/share/ReplicaManagerLogReader.java      |  95 ++++++++++++++++
 .../ReplicaManagerPartitionMetadataProvider.java   | 122 +++++++++++++++++++++
 .../org/apache/kafka/server/share/LogReader.java   |  45 ++++++++
 .../server/share/PartitionMetadataProvider.java    |  70 ++++++++++++
 4 files changed, 332 insertions(+)

diff --git a/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java 
b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
new file mode 100644
index 00000000000..71ce7c74ca9
--- /dev/null
+++ b/core/src/main/java/kafka/server/share/ReplicaManagerLogReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+import scala.collection.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Implementation of {@link LogReader} that reads records from the local log
+ * via {@link ReplicaManager#readFromLog}.
+ */
+public class ReplicaManagerLogReader implements LogReader {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicaManagerLogReader.class);
+
+    private final ReplicaManager replicaManager;
+
+    public ReplicaManagerLogReader(ReplicaManager replicaManager) {
+        this.replicaManager = replicaManager;
+    }
+
+    @Override
+    public LinkedHashMap<TopicIdPartition, LogReadResult> read(
+            FetchParams fetchParams,
+            Set<TopicIdPartition> partitionsToFetch,
+            LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
+            LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
+
+        if (partitionsToFetch.isEmpty()) {
+            return new LinkedHashMap<>();
+        }
+
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
+        topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) ->
+            topicPartitionData.put(topicIdPartition,
+                new FetchRequest.PartitionData(
+                    topicIdPartition.topicId(),
+                    fetchOffset,
+                    0,
+                    partitionMaxBytes.get(topicIdPartition),
+                    Optional.empty())
+            ));
+
+        Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
+            fetchParams,
+            CollectionConverters.asScala(
+                partitionsToFetch.stream().map(topicIdPartition ->
+                    new Tuple2<>(topicIdPartition, 
topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
+            ),
+            QuotaFactory.UNBOUNDED_QUOTA,
+            true);
+
+        LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new 
LinkedHashMap<>();
+        responseLogResult.foreach(tpLogResult -> {
+            responseData.put(tpLogResult._1(), tpLogResult._2());
+            return BoxedUnit.UNIT;
+        });
+
+        log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+        return responseData;
+    }
+}
diff --git 
a/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
 
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
new file mode 100644
index 00000000000..5c115a3c342
--- /dev/null
+++ 
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.share;
+
+import kafka.cluster.Partition;
+import kafka.server.ReplicaManager;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.OffsetNotAvailableException;
+import org.apache.kafka.common.record.internal.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.server.partition.PartitionListener;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+import scala.Some;
+
+/**
+ * Implementation of {@link PartitionMetadataProvider} backed by {@link 
ReplicaManager}.
+ */
+public class ReplicaManagerPartitionMetadataProvider implements 
PartitionMetadataProvider {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReplicaManagerPartitionMetadataProvider.class);
+
+    private final ReplicaManager replicaManager;
+
+    public ReplicaManagerPartitionMetadataProvider(ReplicaManager 
replicaManager) {
+        this.replicaManager = replicaManager;
+    }
+
+    @Override
+    public long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, 
int leaderEpoch) {
+        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.EARLIEST_TIMESTAMP, scala.Option.empty(),
+            Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("Offset for earliest 
timestamp not found for topic partition: " + topicIdPartition);
+        }
+        return timestampAndOffset.get().offset;
+    }
+
+    @Override
+    public long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
int leaderEpoch) {
+        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("Offset for latest timestamp 
not found for topic partition: " + topicIdPartition);
+        }
+        return timestampAndOffset.get().offset;
+    }
+
+    @Override
+    public long offsetForTimestamp(TopicIdPartition topicIdPartition, long 
timestamp, int leaderEpoch) {
+        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), timestamp, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
+            Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("Offset for timestamp " + 
timestamp + " not found for topic partition: " + topicIdPartition);
+        }
+        return timestampAndOffset.get().offset;
+    }
+
+    @Override
+    public Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition 
topicIdPartition, FetchIsolation isolation) {
+        Partition partition = leaderPartition(topicIdPartition);
+        LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+        if (isolation == FetchIsolation.LOG_END)
+            return Optional.of(offsetSnapshot.logEndOffset());
+        else if (isolation == FetchIsolation.HIGH_WATERMARK)
+            return Optional.of(offsetSnapshot.highWatermark());
+        else
+            return Optional.of(offsetSnapshot.lastStableOffset());
+    }
+
+    @Override
+    public int leaderEpoch(TopicIdPartition topicIdPartition) {
+        return leaderPartition(topicIdPartition).getLeaderEpoch();
+    }
+
+    @Override
+    public boolean addPartitionListener(TopicIdPartition topicIdPartition, 
PartitionListener listener) {
+        return 
replicaManager.maybeAddListener(topicIdPartition.topicPartition(), listener);
+    }
+
+    @Override
+    public void removePartitionListener(TopicIdPartition topicIdPartition, 
PartitionListener listener) {
+        replicaManager.removeListener(topicIdPartition.topicPartition(), 
listener);
+    }
+
+    private Partition leaderPartition(TopicIdPartition topicIdPartition) {
+        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        if (!partition.isLeader()) {
+            log.debug("The broker is not the leader for topic partition: {}", 
topicIdPartition.topicPartition());
+            throw new NotLeaderOrFollowerException();
+        }
+        return partition;
+    }
+}
diff --git a/server/src/main/java/org/apache/kafka/server/share/LogReader.java 
b/server/src/main/java/org/apache/kafka/server/share/LogReader.java
new file mode 100644
index 00000000000..60e453e9b86
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/share/LogReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.storage.log.FetchParams;
+import org.apache.kafka.storage.internals.log.LogReadResult;
+
+import java.util.LinkedHashMap;
+import java.util.Set;
+
+/**
+ * Abstraction for reading records from log.
+ */
+public interface LogReader {
+
+    /**
+     * Read records for the given partitions starting at the specified offsets.
+     *
+     * @param fetchParams             The fetch parameters (isolation level, 
maxBytes, etc.)
+     * @param partitionsToFetch       The set of partitions to actually fetch 
(after filtering erroneous ones)
+     * @param topicPartitionFetchOffsets The fetch offset per partition
+     * @param partitionMaxBytes       The max bytes per partition
+     * @return A map of partition to log read result
+     */
+    LinkedHashMap<TopicIdPartition, LogReadResult> read(
+        FetchParams fetchParams,
+        Set<TopicIdPartition> partitionsToFetch,
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
+        LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes);
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
 
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
new file mode 100644
index 00000000000..0dce70d6359
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.partition.PartitionListener;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+
+import java.util.Optional;
+
+/**
+ * Abstraction for partition metadata operations.
+ */
+public interface PartitionMetadataProvider {
+
+    /**
+     * Resolve the offset for the earliest timestamp.
+     */
+    long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, int 
leaderEpoch);
+
+    /**
+     * Resolve the offset for the latest timestamp.
+     */
+    long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, int 
leaderEpoch);
+
+    /**
+     * Resolve the offset for a specific timestamp.
+     */
+    long offsetForTimestamp(TopicIdPartition topicIdPartition, long timestamp, 
int leaderEpoch);
+
+    /**
+     * Get the end offset metadata for minBytes estimation.
+     *
+     * @return The end offset metadata based on the given fetch isolation, or
+     *         {@link Optional#empty()} when no local partition metadata is 
available.
+     */
+    Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition 
topicIdPartition, FetchIsolation isolation);
+
+    /**
+     * Get the leader epoch for a partition.
+     */
+    int leaderEpoch(TopicIdPartition topicIdPartition);
+
+    /**
+     * Register a partition listener for state change notifications.
+     *
+     * @return true if the listener was successfully added.
+     */
+    boolean addPartitionListener(TopicIdPartition topicIdPartition, 
PartitionListener listener);
+
+    /**
+     * Remove a previously registered partition listener.
+     */
+    void removePartitionListener(TopicIdPartition topicIdPartition, 
PartitionListener listener);
+}

Reply via email to