This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5914013219a KAFKA-17980: Introduce `isReady` API in
RemoteLogMetadataManager (#17737)
5914013219a is described below
commit 5914013219a3769df8e28408d5e99bf3e500b9c4
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Nov 12 22:47:48 2024 +0530
KAFKA-17980: Introduce `isReady` API in RemoteLogMetadataManager (#17737)
- The isReady API in RemoteLogMetadataManager (RLMM) is used to denote
whether the partition metadata is ready for remote storage operations. The
plugin implementors can use this API to denote the partition status while
bootstrapping the RLMM.
- Using this API, we are gracefully starting the remote log components. The
segment copy, delete, and other operations that hits remote storage will be
invoked once the metadata is ready for a given partition.
- See KIP-1105
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic)
for more details.
Reviewers: Federico Valeri <[email protected]>, Satish Duggana
<[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 24 ++++++-----
.../kafka/log/remote/RemoteLogManagerTest.java | 46 +++++++++++++++++++---
.../remote/storage/RemoteLogMetadataManager.java | 10 +++++
.../ClassLoaderAwareRemoteLogMetadataManager.java | 5 +++
.../TopicBasedRemoteLogMetadataManager.java | 5 +++
.../TopicBasedRemoteLogMetadataManagerTest.java | 6 +++
6 files changed, 82 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 9cd0515ad35..237619bceea 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -474,9 +474,9 @@ public class RemoteLogManager implements Closeable {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
- LOGGER.info("Cancelling the copy RLM task for tpId: {}",
tpId);
+ LOGGER.info("Cancelling the copy RLM task for partition:
{}", tpId);
task.cancel();
- LOGGER.info("Resetting remote copy lag metrics for tpId:
{}", tpId);
+ LOGGER.info("Resetting remote copy lag metrics for
partition: {}", tpId);
((RLMCopyTask) task.rlmTask).resetLagStats();
return null;
});
@@ -501,17 +501,17 @@ public class RemoteLogManager implements Closeable {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
leaderCopyRLMTasks.computeIfPresent(tpId,
(topicIdPartition, task) -> {
- LOGGER.info("Cancelling the copy RLM task for tpId:
{}", tpId);
+ LOGGER.info("Cancelling the copy RLM task for
partition: {}", tpId);
task.cancel();
return null;
});
leaderExpirationRLMTasks.computeIfPresent(tpId,
(topicIdPartition, task) -> {
- LOGGER.info("Cancelling the expiration RLM task for
tpId: {}", tpId);
+ LOGGER.info("Cancelling the expiration RLM task for
partition: {}", tpId);
task.cancel();
return null;
});
followerRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
- LOGGER.info("Cancelling the follower RLM task for
tpId: {}", tpId);
+ LOGGER.info("Cancelling the follower RLM task for
partition: {}", tpId);
task.cancel();
return null;
});
@@ -790,8 +790,14 @@ public class RemoteLogManager implements Closeable {
}
public void run() {
- if (isCancelled())
+ if (isCancelled()) {
+ logger.debug("Skipping the current run for partition {} as it
is cancelled", topicIdPartition);
return;
+ }
+ if (!remoteLogMetadataManager.isReady(topicIdPartition)) {
+ logger.debug("Skipping the current run for partition {} as the
remote-log metadata is not ready", topicIdPartition);
+ return;
+ }
try {
Optional<UnifiedLog> unifiedLogOptional =
fetchLog.apply(topicIdPartition.topicPartition());
@@ -803,13 +809,13 @@ public class RemoteLogManager implements Closeable {
execute(unifiedLogOptional.get());
} catch (InterruptedException ex) {
if (!isCancelled()) {
- logger.warn("Current thread for topic-partition-id {} is
interrupted", topicIdPartition, ex);
+ logger.warn("Current thread for partition {} is
interrupted", topicIdPartition, ex);
}
} catch (RetriableException ex) {
- logger.debug("Encountered a retryable error while executing
current task for topic-partition {}", topicIdPartition, ex);
+ logger.debug("Encountered a retryable error while executing
current task for partition {}", topicIdPartition, ex);
} catch (Exception ex) {
if (!isCancelled()) {
- logger.warn("Current task for topic-partition {} received
error but it will be scheduled", topicIdPartition, ex);
+ logger.warn("Current task for partition {} received error
but it will be scheduled", topicIdPartition, ex);
}
}
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 7bd77ba1052..881da2b32c9 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -165,12 +165,16 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -259,6 +263,7 @@ public class RemoteLogManagerTest {
return 0L;
}
};
+
doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class));
}
@AfterEach
@@ -2144,11 +2149,6 @@ public class RemoteLogManagerTest {
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new
StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
-
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
-
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
- assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
-
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
- assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024,
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
@@ -2159,6 +2159,12 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
.thenReturn(dummyFuture);
+
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+ assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
+
remoteLogManager.stopPartitions(partitions, errorHandler);
assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
@@ -3643,6 +3649,36 @@ public class RemoteLogManagerTest {
assertEquals(273,
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
}
+ @Test
+ public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException
{
+ CountDownLatch latch = new CountDownLatch(2);
+ when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class)))
+ .thenAnswer(ans -> {
+ latch.countDown();
+ return false;
+ });
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
+ Collections.singleton(mockPartition(followerTopicIdPartition)),
+ topicIds
+ );
+ assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition));
+
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
+
+ // Once the partitions are assigned to the broker either as leader
(or) follower in RLM#onLeadershipChange,
+ // then it should have called the `isReady` method for each of the
partitions. Otherwise, the test will fail.
+ latch.await(5, TimeUnit.SECONDS);
+ verify(remoteLogMetadataManager).configure(anyMap());
+
verify(remoteLogMetadataManager).onPartitionLeadershipChanges(anySet(),
anySet());
+ verify(remoteLogMetadataManager,
atLeastOnce()).isReady(eq(leaderTopicIdPartition));
+ verify(remoteLogMetadataManager,
atLeastOnce()).isReady(eq(followerTopicIdPartition));
+ verifyNoMoreInteractions(remoteLogMetadataManager);
+ verify(remoteStorageManager).configure(anyMap());
+ verifyNoMoreInteractions(remoteStorageManager);
+ }
+
private void appendRecordsToFile(File file, int nRecords, int
nRecordsPerBatch) throws IOException {
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
Compression compression = Compression.NONE;
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 2280aa51132..efc37128ab2 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -230,4 +230,14 @@ public interface RemoteLogMetadataManager extends
Configurable, Closeable {
long
offset) throws RemoteStorageException {
return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
}
+
+ /**
+ * Denotes whether the partition metadata is ready to serve.
+ *
+ * @param topicIdPartition topic partition
+ * @return True if the partition is ready to serve for remote storage
operations.
+ */
+ default boolean isReady(TopicIdPartition topicIdPartition) {
+ return true;
+ }
}
\ No newline at end of file
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index 1abcbbc20ce..5d5cba2ca11 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -111,6 +111,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager
implements RemoteLogMetada
return withClassLoader(() ->
delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
}
+ @Override
+ public boolean isReady(TopicIdPartition topicIdPartition) {
+ return withClassLoader(() -> delegate.isReady(topicIdPartition));
+ }
+
@Override
public void configure(Map<String, ?> configs) {
withClassLoader(() -> {
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index a5db1ea38ef..58d571630d2 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -396,6 +396,11 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
}
+ @Override
+ public boolean isReady(TopicIdPartition topicIdPartition) {
+ return remotePartitionMetadataStore.isInitialized(topicIdPartition);
+ }
+
private void initializeResources() {
log.info("Initializing topic-based RLMM resources");
final NewTopic remoteLogMetadataTopicRequest =
createRemoteLogMetadataTopicRequest();
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 9937a9f37ae..8fc8efc7cd3 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -152,6 +152,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
assertThrows(RemoteResourceNotFoundException.class, () ->
topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
assertThrows(RemoteResourceNotFoundException.class, () ->
topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
+ assertFalse(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
+ assertFalse(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
+
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
Collections.singleton(newFollowerTopicIdPartition));
@@ -166,6 +169,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
+
+ assertTrue(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
+ assertTrue(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
}
@ClusterTest