This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5914013219a KAFKA-17980: Introduce `isReady` API in 
RemoteLogMetadataManager (#17737)
5914013219a is described below

commit 5914013219a3769df8e28408d5e99bf3e500b9c4
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Nov 12 22:47:48 2024 +0530

    KAFKA-17980: Introduce `isReady` API in RemoteLogMetadataManager (#17737)
    
    - The isReady API in RemoteLogMetadataManager (RLMM) is used to denote 
whether the partition metadata is ready for remote storage operations. The 
plugin implementors can use this API to denote the partition status while 
bootstrapping the RLMM.
    - Using this API, we are gracefully starting the remote log components. The 
segment copy, delete, and other operations that hits remote storage will be 
invoked once the metadata is ready for a given partition.
    - See KIP-1105 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic)
 for more details.
    
    Reviewers: Federico Valeri <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 24 ++++++-----
 .../kafka/log/remote/RemoteLogManagerTest.java     | 46 +++++++++++++++++++---
 .../remote/storage/RemoteLogMetadataManager.java   | 10 +++++
 .../ClassLoaderAwareRemoteLogMetadataManager.java  |  5 +++
 .../TopicBasedRemoteLogMetadataManager.java        |  5 +++
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  6 +++
 6 files changed, 82 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 9cd0515ad35..237619bceea 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -474,9 +474,9 @@ public class RemoteLogManager implements Closeable {
             if (topicIdByPartitionMap.containsKey(tp)) {
                 TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
                 leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
-                    LOGGER.info("Cancelling the copy RLM task for tpId: {}", 
tpId);
+                    LOGGER.info("Cancelling the copy RLM task for partition: 
{}", tpId);
                     task.cancel();
-                    LOGGER.info("Resetting remote copy lag metrics for tpId: 
{}", tpId);
+                    LOGGER.info("Resetting remote copy lag metrics for 
partition: {}", tpId);
                     ((RLMCopyTask) task.rlmTask).resetLagStats();
                     return null;
                 });
@@ -501,17 +501,17 @@ public class RemoteLogManager implements Closeable {
                 if (topicIdByPartitionMap.containsKey(tp)) {
                     TopicIdPartition tpId = new 
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
                     leaderCopyRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
-                        LOGGER.info("Cancelling the copy RLM task for tpId: 
{}", tpId);
+                        LOGGER.info("Cancelling the copy RLM task for 
partition: {}", tpId);
                         task.cancel();
                         return null;
                     });
                     leaderExpirationRLMTasks.computeIfPresent(tpId, 
(topicIdPartition, task) -> {
-                        LOGGER.info("Cancelling the expiration RLM task for 
tpId: {}", tpId);
+                        LOGGER.info("Cancelling the expiration RLM task for 
partition: {}", tpId);
                         task.cancel();
                         return null;
                     });
                     followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, 
task) -> {
-                        LOGGER.info("Cancelling the follower RLM task for 
tpId: {}", tpId);
+                        LOGGER.info("Cancelling the follower RLM task for 
partition: {}", tpId);
                         task.cancel();
                         return null;
                     });
@@ -790,8 +790,14 @@ public class RemoteLogManager implements Closeable {
         }
 
         public void run() {
-            if (isCancelled())
+            if (isCancelled()) {
+                logger.debug("Skipping the current run for partition {} as it 
is cancelled", topicIdPartition);
                 return;
+            }
+            if (!remoteLogMetadataManager.isReady(topicIdPartition)) {
+                logger.debug("Skipping the current run for partition {} as the 
remote-log metadata is not ready", topicIdPartition);
+                return;
+            }
 
             try {
                 Optional<UnifiedLog> unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
@@ -803,13 +809,13 @@ public class RemoteLogManager implements Closeable {
                 execute(unifiedLogOptional.get());
             } catch (InterruptedException ex) {
                 if (!isCancelled()) {
-                    logger.warn("Current thread for topic-partition-id {} is 
interrupted", topicIdPartition, ex);
+                    logger.warn("Current thread for partition {} is 
interrupted", topicIdPartition, ex);
                 }
             } catch (RetriableException ex) {
-                logger.debug("Encountered a retryable error while executing 
current task for topic-partition {}", topicIdPartition, ex);
+                logger.debug("Encountered a retryable error while executing 
current task for partition {}", topicIdPartition, ex);
             } catch (Exception ex) {
                 if (!isCancelled()) {
-                    logger.warn("Current task for topic-partition {} received 
error but it will be scheduled", topicIdPartition, ex);
+                    logger.warn("Current task for partition {} received error 
but it will be scheduled", topicIdPartition, ex);
                 }
             }
         }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 7bd77ba1052..881da2b32c9 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -165,12 +165,16 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -259,6 +263,7 @@ public class RemoteLogManagerTest {
                 return 0L;
             }
         };
+        
doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class));
     }
 
     @AfterEach
@@ -2144,11 +2149,6 @@ public class RemoteLogManagerTest {
         Set<StopPartition> partitions = new HashSet<>();
         partitions.add(new 
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
         partitions.add(new 
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
-        
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
-                
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
-        assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
-        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
-        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
 
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
                 
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
@@ -2159,6 +2159,12 @@ public class RemoteLogManagerTest {
         when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
                 .thenReturn(dummyFuture);
 
+        
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+                
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+        assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
+
         remoteLogManager.stopPartitions(partitions, errorHandler);
         assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
         
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
@@ -3643,6 +3649,36 @@ public class RemoteLogManagerTest {
         assertEquals(273, 
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
     }
 
+    @Test
+    public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException 
{
+        CountDownLatch latch = new CountDownLatch(2);
+        when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class)))
+                .thenAnswer(ans -> {
+                    latch.countDown();
+                    return false;
+                });
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)),
+                Collections.singleton(mockPartition(followerTopicIdPartition)),
+                topicIds
+        );
+        assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition));
+        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
+
+        // Once the partitions are assigned to the broker either as leader 
(or) follower in RLM#onLeadershipChange,
+        // then it should have called the `isReady` method for each of the 
partitions. Otherwise, the test will fail.
+        latch.await(5, TimeUnit.SECONDS);
+        verify(remoteLogMetadataManager).configure(anyMap());
+        
verify(remoteLogMetadataManager).onPartitionLeadershipChanges(anySet(), 
anySet());
+        verify(remoteLogMetadataManager, 
atLeastOnce()).isReady(eq(leaderTopicIdPartition));
+        verify(remoteLogMetadataManager, 
atLeastOnce()).isReady(eq(followerTopicIdPartition));
+        verifyNoMoreInteractions(remoteLogMetadataManager);
+        verify(remoteStorageManager).configure(anyMap());
+        verifyNoMoreInteractions(remoteStorageManager);
+    }
+
     private void appendRecordsToFile(File file, int nRecords, int 
nRecordsPerBatch) throws IOException {
         byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
         Compression compression = Compression.NONE;
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 2280aa51132..efc37128ab2 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -230,4 +230,14 @@ public interface RemoteLogMetadataManager extends 
Configurable, Closeable {
                                                                        long 
offset) throws RemoteStorageException {
         return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
     }
+
+    /**
+     * Denotes whether the partition metadata is ready to serve.
+     *
+     * @param topicIdPartition topic partition
+     * @return True if the partition is ready to serve for remote storage 
operations.
+     */
+    default boolean isReady(TopicIdPartition topicIdPartition) {
+        return true;
+    }
 }
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index 1abcbbc20ce..5d5cba2ca11 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -111,6 +111,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager 
implements RemoteLogMetada
         return withClassLoader(() -> 
delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
     }
 
+    @Override
+    public boolean isReady(TopicIdPartition topicIdPartition) {
+        return withClassLoader(() -> delegate.isReady(topicIdPartition));
+    }
+
     @Override
     public void configure(Map<String, ?> configs) {
         withClassLoader(() -> {
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index a5db1ea38ef..58d571630d2 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -396,6 +396,11 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
+    @Override
+    public boolean isReady(TopicIdPartition topicIdPartition) {
+        return remotePartitionMetadataStore.isInitialized(topicIdPartition);
+    }
+
     private void initializeResources() {
         log.info("Initializing topic-based RLMM resources");
         final NewTopic remoteLogMetadataTopicRequest = 
createRemoteLogMetadataTopicRequest();
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 9937a9f37ae..8fc8efc7cd3 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -152,6 +152,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
         assertThrows(RemoteResourceNotFoundException.class, () -> 
topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
 
+        assertFalse(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
+        assertFalse(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
+
         
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
                                                       
Collections.singleton(newFollowerTopicIdPartition));
 
@@ -166,6 +169,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
         
assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
         
assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
+
+        assertTrue(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
+        assertTrue(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
     }
 
     @ClusterTest

Reply via email to