This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb9c6718fab KAFKA-18722: Remove the unreferenced methods in TBRLMM and 
ConsumerManager (#18791)
cb9c6718fab is described below

commit cb9c6718fab545044b516ca78808de3ca090d9c1
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Feb 5 18:43:24 2025 +0530

    KAFKA-18722: Remove the unreferenced methods in TBRLMM and ConsumerManager 
(#18791)
    
    Reviewers: Luke Chen <[email protected]>, Christo Lolov <[email protected]>
---
 .../log/remote/metadata/storage/ConsumerManager.java | 17 ++++++-----------
 .../log/remote/metadata/storage/ConsumerTask.java    | 20 ++++++++++----------
 .../log/remote/metadata/storage/ProducerManager.java |  2 +-
 .../metadata/storage/RemoteLogMetadataCache.java     | 18 +++++++++---------
 .../storage/RemotePartitionMetadataStore.java        | 20 ++++++++++----------
 .../storage/TopicBasedRemoteLogMetadataManager.java  | 16 +---------------
 6 files changed, 37 insertions(+), 56 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index dafbb2eb046..2860ac4b76e 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
@@ -68,7 +67,7 @@ public class ConsumerManager implements Closeable {
         consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", 
consumerTask);
     }
 
-    public void startConsumerThread() {
+    void startConsumerThread() {
         try {
             // Start a thread to continuously consume records from topic 
partitions.
             consumerTaskThread.start();
@@ -86,7 +85,7 @@ public class ConsumerManager implements Closeable {
      * @throws TimeoutException if this method execution did not complete with 
in the wait time configured with
      *                          property {@code 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP}.
      */
-    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) 
throws TimeoutException {
+    void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws 
TimeoutException {
         waitTillConsumptionCatchesUp(recordMetadata, 
rlmmConfig.consumeWaitMs());
     }
 
@@ -97,8 +96,8 @@ public class ConsumerManager implements Closeable {
      * @param timeoutMs      wait timeout in milliseconds
      * @throws TimeoutException if this method execution did not complete with 
in the given {@code timeoutMs}.
      */
-    public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
-                                             long timeoutMs) throws 
TimeoutException {
+    void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
+                                      long timeoutMs) throws TimeoutException {
         int partition = recordMetadata.partition();
         // If the current assignment does not have the subscription for this 
partition then return immediately.
         if (!consumerTask.isMetadataPartitionAssigned(partition)) {
@@ -137,15 +136,11 @@ public class ConsumerManager implements Closeable {
         }
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
+    void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
         consumerTask.addAssignmentsForPartitions(partitions);
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
+    void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
         consumerTask.removeAssignmentsForPartitions(partitions);
     }
-
-    public Optional<Long> readOffsetForPartition(int metadataPartition) {
-        return consumerTask.readOffsetForMetadataPartition(metadataPartition);
-    }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index a0376823f34..efe20675db3 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -129,8 +129,8 @@ class ConsumerTask implements Runnable, Closeable {
         log.info("Exited from consumer task thread");
     }
 
-    // public for testing
-    public void ingestRecords() {
+    // visible for testing
+    void ingestRecords() {
         try {
             if (hasAssignmentChanged) {
                 maybeWaitForPartitionAssignments();
@@ -153,8 +153,8 @@ class ConsumerTask implements Runnable, Closeable {
         }
     }
 
-    // public for testing
-    public void closeConsumer() {
+    // visible for testing
+    void closeConsumer() {
         try {
             consumer.close(Duration.ofSeconds(30));
         } catch (final Exception e) {
@@ -298,11 +298,11 @@ class ConsumerTask implements Runnable, Closeable {
         log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
     }
 
-    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+    void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
         updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
     }
 
-    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+    void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
         updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
@@ -325,15 +325,15 @@ class ConsumerTask implements Runnable, Closeable {
         }
     }
 
-    public Optional<Long> readOffsetForMetadataPartition(final int partition) {
+    Optional<Long> readOffsetForMetadataPartition(final int partition) {
         return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
     }
 
-    public boolean isMetadataPartitionAssigned(final int partition) {
+    boolean isMetadataPartitionAssigned(final int partition) {
         return assignedMetadataPartitions.contains(partition);
     }
 
-    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+    boolean isUserPartitionAssigned(final TopicIdPartition partition) {
         final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
         return utp != null && utp.isAssigned;
     }
@@ -351,7 +351,7 @@ class ConsumerTask implements Runnable, Closeable {
         }
     }
 
-    public Set<Integer> metadataPartitionsAssigned() {
+    Set<Integer> metadataPartitionsAssigned() {
         return Collections.unmodifiableSet(assignedMetadataPartitions);
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
index e6fd0e9cdec..dc2c0d20abc 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
@@ -59,7 +59,7 @@ public class ProducerManager implements Closeable {
      * @param remoteLogMetadata RemoteLogMetadata to be published
      * @return
      */
-    public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
+    CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
         CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
 
         TopicIdPartition topicIdPartition = 
remoteLogMetadata.topicIdPartition();
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index b08458b84b8..ef9f6f62a6e 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -108,11 +108,11 @@ public class RemoteLogMetadataCache {
 
     private final CountDownLatch initializedLatch = new CountDownLatch(1);
 
-    public void markInitialized() {
+    void markInitialized() {
         initializedLatch.countDown();
     }
 
-    public boolean isInitialized() {
+    boolean isInitialized() {
         return initializedLatch.getCount() == 0;
     }
 
@@ -124,7 +124,7 @@ public class RemoteLogMetadataCache {
      * @param offset      offset
      * @return the requested remote log segment metadata if it exists.
      */
-    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+    Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
         RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, 
offset);
         long epochEndOffset = -1L;
         if (metadata != null) {
@@ -139,7 +139,7 @@ public class RemoteLogMetadataCache {
         return offset > epochEndOffset ? Optional.empty() : 
Optional.ofNullable(metadata);
     }
 
-    public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int 
leaderEpoch, long offset) {
+    Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int 
leaderEpoch, long offset) {
         boolean txnIdxEmpty = true;
         Optional<RemoteLogSegmentMetadata> metadataOpt = 
remoteLogSegmentMetadata(leaderEpoch, offset);
         while (metadataOpt.isPresent() && txnIdxEmpty) {
@@ -166,7 +166,7 @@ public class RemoteLogMetadataCache {
         return null;
     }
 
-    public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
+    void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
             throws RemoteResourceNotFoundException {
         log.debug("Updating remote log segment metadata: [{}]", 
metadataUpdate);
         Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be 
null");
@@ -273,7 +273,7 @@ public class RemoteLogMetadataCache {
      *
      * @return
      */
-    public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
+    Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
         // Return all the segments including unreferenced metadata.
         return 
Collections.unmodifiableCollection(idToSegmentMetadata.values()).iterator();
     }
@@ -283,7 +283,7 @@ public class RemoteLogMetadataCache {
      *
      * @param leaderEpoch leader epoch.
      */
-    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int 
leaderEpoch)
+    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch)
             throws RemoteResourceNotFoundException {
         RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
         if (remoteLogLeaderEpochState == null) {
@@ -299,7 +299,7 @@ public class RemoteLogMetadataCache {
      *
      * @param leaderEpoch leader epoch
      */
-    public Optional<Long> highestOffsetForEpoch(int leaderEpoch) {
+    Optional<Long> highestOffsetForEpoch(int leaderEpoch) {
         RemoteLogLeaderEpochState entry = leaderEpochEntries.get(leaderEpoch);
         return entry != null ? Optional.ofNullable(entry.highestLogOffset()) : 
Optional.empty();
     }
@@ -310,7 +310,7 @@ public class RemoteLogMetadataCache {
      *
      * @param remoteLogSegmentMetadata RemoteLogSegmentMetadata instance
      */
-    public void addCopyInProgressSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+    void addCopyInProgressSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
         log.debug("Adding to in-progress state: [{}]", 
remoteLogSegmentMetadata);
         Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index c74e97fbe68..5e71a224d9e 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -106,12 +106,12 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         idToRemoteLogMetadataCache.remove(topicIdPartition);
     }
 
-    public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition)
+    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition 
topicIdPartition)
             throws RemoteStorageException {
         return 
getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
     }
 
-    public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
+    Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition 
topicIdPartition, int leaderEpoch)
             throws RemoteStorageException {
         return 
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
     }
@@ -130,21 +130,21 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         return remoteLogMetadataCache;
     }
 
-    public Optional<RemoteLogSegmentMetadata> 
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
-                                                                       long 
offset,
-                                                                       int 
epochForOffset)
+    Optional<RemoteLogSegmentMetadata> 
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                long offset,
+                                                                int 
epochForOffset)
             throws RemoteStorageException {
         return 
getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset,
 offset);
     }
 
-    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
-                                                                      int 
epoch,
-                                                                      long 
offset) throws RemoteStorageException {
+    Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+                                                               int epoch,
+                                                               long offset) 
throws RemoteStorageException {
         return 
getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch, 
offset);
     }
 
-    public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
-                                           int leaderEpoch) throws 
RemoteStorageException {
+    Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
+                                    int leaderEpoch) throws 
RemoteStorageException {
         return 
getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 58d571630d2..6cf52d523de 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -272,15 +272,6 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
-    public int metadataPartition(TopicIdPartition topicIdPartition) {
-        return rlmTopicPartitioner.metadataPartition(topicIdPartition);
-    }
-
-    // Visible For Testing
-    public Optional<Long> readOffsetForPartition(int metadataPartition) {
-        return consumerManager.readOffsetForPartition(metadataPartition);
-    }
-
     @Override
     public void onPartitionLeadershipChanges(Set<TopicIdPartition> 
leaderPartitions,
                                              Set<TopicIdPartition> 
followerPartitions) {
@@ -560,7 +551,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         return doesTopicExist;
     }
 
-    public boolean isInitialized() {
+    boolean isInitialized() {
         return initialized.get();
     }
 
@@ -575,11 +566,6 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
-    // Visible for testing.
-    public TopicBasedRemoteLogMetadataManagerConfig config() {
-        return rlmmConfig;
-    }
-
     @Override
     public void close() throws IOException {
         // Close all the resources.

Reply via email to