This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb9c6718fab KAFKA-18722: Remove the unreferenced methods in TBRLMM and
ConsumerManager (#18791)
cb9c6718fab is described below
commit cb9c6718fab545044b516ca78808de3ca090d9c1
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Feb 5 18:43:24 2025 +0530
KAFKA-18722: Remove the unreferenced methods in TBRLMM and ConsumerManager
(#18791)
Reviewers: Luke Chen <[email protected]>, Christo Lolov <[email protected]>
---
.../log/remote/metadata/storage/ConsumerManager.java | 17 ++++++-----------
.../log/remote/metadata/storage/ConsumerTask.java | 20 ++++++++++----------
.../log/remote/metadata/storage/ProducerManager.java | 2 +-
.../metadata/storage/RemoteLogMetadataCache.java | 18 +++++++++---------
.../storage/RemotePartitionMetadataStore.java | 20 ++++++++++----------
.../storage/TopicBasedRemoteLogMetadataManager.java | 16 +---------------
6 files changed, 37 insertions(+), 56 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index dafbb2eb046..2860ac4b76e 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@@ -68,7 +67,7 @@ public class ConsumerManager implements Closeable {
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask",
consumerTask);
}
- public void startConsumerThread() {
+ void startConsumerThread() {
try {
// Start a thread to continuously consume records from topic
partitions.
consumerTaskThread.start();
@@ -86,7 +85,7 @@ public class ConsumerManager implements Closeable {
* @throws TimeoutException if this method execution did not complete with
in the wait time configured with
* property {@code
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP}.
*/
- public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata)
throws TimeoutException {
+ void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws
TimeoutException {
waitTillConsumptionCatchesUp(recordMetadata,
rlmmConfig.consumeWaitMs());
}
@@ -97,8 +96,8 @@ public class ConsumerManager implements Closeable {
* @param timeoutMs wait timeout in milliseconds
* @throws TimeoutException if this method execution did not complete with
in the given {@code timeoutMs}.
*/
- public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
- long timeoutMs) throws
TimeoutException {
+ void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
+ long timeoutMs) throws TimeoutException {
int partition = recordMetadata.partition();
// If the current assignment does not have the subscription for this
partition then return immediately.
if (!consumerTask.isMetadataPartitionAssigned(partition)) {
@@ -137,15 +136,11 @@ public class ConsumerManager implements Closeable {
}
}
- public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
+ void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
consumerTask.addAssignmentsForPartitions(partitions);
}
- public void removeAssignmentsForPartitions(Set<TopicIdPartition>
partitions) {
+ void removeAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
consumerTask.removeAssignmentsForPartitions(partitions);
}
-
- public Optional<Long> readOffsetForPartition(int metadataPartition) {
- return consumerTask.readOffsetForMetadataPartition(metadataPartition);
- }
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index a0376823f34..efe20675db3 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -129,8 +129,8 @@ class ConsumerTask implements Runnable, Closeable {
log.info("Exited from consumer task thread");
}
- // public for testing
- public void ingestRecords() {
+ // visible for testing
+ void ingestRecords() {
try {
if (hasAssignmentChanged) {
maybeWaitForPartitionAssignments();
@@ -153,8 +153,8 @@ class ConsumerTask implements Runnable, Closeable {
}
}
- // public for testing
- public void closeConsumer() {
+ // visible for testing
+ void closeConsumer() {
try {
consumer.close(Duration.ofSeconds(30));
} catch (final Exception e) {
@@ -298,11 +298,11 @@ class ConsumerTask implements Runnable, Closeable {
log.info("Unassigned user-topic-partitions: {}",
unassignedPartitions.size());
}
- public void addAssignmentsForPartitions(final Set<TopicIdPartition>
partitions) {
+ void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
updateAssignments(Objects.requireNonNull(partitions),
Collections.emptySet());
}
- public void removeAssignmentsForPartitions(final Set<TopicIdPartition>
partitions) {
+ void removeAssignmentsForPartitions(final Set<TopicIdPartition>
partitions) {
updateAssignments(Collections.emptySet(),
Objects.requireNonNull(partitions));
}
@@ -325,15 +325,15 @@ class ConsumerTask implements Runnable, Closeable {
}
}
- public Optional<Long> readOffsetForMetadataPartition(final int partition) {
+ Optional<Long> readOffsetForMetadataPartition(final int partition) {
return
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
}
- public boolean isMetadataPartitionAssigned(final int partition) {
+ boolean isMetadataPartitionAssigned(final int partition) {
return assignedMetadataPartitions.contains(partition);
}
- public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+ boolean isUserPartitionAssigned(final TopicIdPartition partition) {
final UserTopicIdPartition utp =
assignedUserTopicIdPartitions.get(partition);
return utp != null && utp.isAssigned;
}
@@ -351,7 +351,7 @@ class ConsumerTask implements Runnable, Closeable {
}
}
- public Set<Integer> metadataPartitionsAssigned() {
+ Set<Integer> metadataPartitionsAssigned() {
return Collections.unmodifiableSet(assignedMetadataPartitions);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
index e6fd0e9cdec..dc2c0d20abc 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
@@ -59,7 +59,7 @@ public class ProducerManager implements Closeable {
* @param remoteLogMetadata RemoteLogMetadata to be published
* @return
*/
- public CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata
remoteLogMetadata) {
+ CompletableFuture<RecordMetadata> publishMessage(RemoteLogMetadata
remoteLogMetadata) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
TopicIdPartition topicIdPartition =
remoteLogMetadata.topicIdPartition();
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index b08458b84b8..ef9f6f62a6e 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -108,11 +108,11 @@ public class RemoteLogMetadataCache {
private final CountDownLatch initializedLatch = new CountDownLatch(1);
- public void markInitialized() {
+ void markInitialized() {
initializedLatch.countDown();
}
- public boolean isInitialized() {
+ boolean isInitialized() {
return initializedLatch.getCount() == 0;
}
@@ -124,7 +124,7 @@ public class RemoteLogMetadataCache {
* @param offset offset
* @return the requested remote log segment metadata if it exists.
*/
- public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
+ Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch,
offset);
long epochEndOffset = -1L;
if (metadata != null) {
@@ -139,7 +139,7 @@ public class RemoteLogMetadataCache {
return offset > epochEndOffset ? Optional.empty() :
Optional.ofNullable(metadata);
}
- public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int
leaderEpoch, long offset) {
+ Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int
leaderEpoch, long offset) {
boolean txnIdxEmpty = true;
Optional<RemoteLogSegmentMetadata> metadataOpt =
remoteLogSegmentMetadata(leaderEpoch, offset);
while (metadataOpt.isPresent() && txnIdxEmpty) {
@@ -166,7 +166,7 @@ public class RemoteLogMetadataCache {
return null;
}
- public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
+ void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
throws RemoteResourceNotFoundException {
log.debug("Updating remote log segment metadata: [{}]",
metadataUpdate);
Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be
null");
@@ -273,7 +273,7 @@ public class RemoteLogMetadataCache {
*
* @return
*/
- public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
+ Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() {
// Return all the segments including unreferenced metadata.
return
Collections.unmodifiableCollection(idToSegmentMetadata.values()).iterator();
}
@@ -283,7 +283,7 @@ public class RemoteLogMetadataCache {
*
* @param leaderEpoch leader epoch.
*/
- public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int
leaderEpoch)
+ Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch)
throws RemoteResourceNotFoundException {
RemoteLogLeaderEpochState remoteLogLeaderEpochState =
leaderEpochEntries.get(leaderEpoch);
if (remoteLogLeaderEpochState == null) {
@@ -299,7 +299,7 @@ public class RemoteLogMetadataCache {
*
* @param leaderEpoch leader epoch
*/
- public Optional<Long> highestOffsetForEpoch(int leaderEpoch) {
+ Optional<Long> highestOffsetForEpoch(int leaderEpoch) {
RemoteLogLeaderEpochState entry = leaderEpochEntries.get(leaderEpoch);
return entry != null ? Optional.ofNullable(entry.highestLogOffset()) :
Optional.empty();
}
@@ -310,7 +310,7 @@ public class RemoteLogMetadataCache {
*
* @param remoteLogSegmentMetadata RemoteLogSegmentMetadata instance
*/
- public void addCopyInProgressSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ void addCopyInProgressSegment(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
log.debug("Adding to in-progress state: [{}]",
remoteLogSegmentMetadata);
Objects.requireNonNull(remoteLogSegmentMetadata,
"remoteLogSegmentMetadata can not be null");
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index c74e97fbe68..5e71a224d9e 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -106,12 +106,12 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
idToRemoteLogMetadataCache.remove(topicIdPartition);
}
- public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition)
+ Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition
topicIdPartition)
throws RemoteStorageException {
return
getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
}
- public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
+ Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition
topicIdPartition, int leaderEpoch)
throws RemoteStorageException {
return
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
}
@@ -130,21 +130,21 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
return remoteLogMetadataCache;
}
- public Optional<RemoteLogSegmentMetadata>
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
- long
offset,
- int
epochForOffset)
+ Optional<RemoteLogSegmentMetadata>
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+ long offset,
+ int
epochForOffset)
throws RemoteStorageException {
return
getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset,
offset);
}
- public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
- int
epoch,
- long
offset) throws RemoteStorageException {
+ Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+ int epoch,
+ long offset)
throws RemoteStorageException {
return
getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch,
offset);
}
- public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
- int leaderEpoch) throws
RemoteStorageException {
+ Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
+ int leaderEpoch) throws
RemoteStorageException {
return
getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 58d571630d2..6cf52d523de 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -272,15 +272,6 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
}
- public int metadataPartition(TopicIdPartition topicIdPartition) {
- return rlmTopicPartitioner.metadataPartition(topicIdPartition);
- }
-
- // Visible For Testing
- public Optional<Long> readOffsetForPartition(int metadataPartition) {
- return consumerManager.readOffsetForPartition(metadataPartition);
- }
-
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition>
leaderPartitions,
Set<TopicIdPartition>
followerPartitions) {
@@ -560,7 +551,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
return doesTopicExist;
}
- public boolean isInitialized() {
+ boolean isInitialized() {
return initialized.get();
}
@@ -575,11 +566,6 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
}
- // Visible for testing.
- public TopicBasedRemoteLogMetadataManagerConfig config() {
- return rlmmConfig;
- }
-
@Override
public void close() throws IOException {
// Close all the resources.