This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 0b4fcbb16d0 KAFKA-15265: Integrate RLMQuotaManager for throttling
copies to remote storage (#15820)
0b4fcbb16d0 is described below
commit 0b4fcbb16d0fbab67df906bdfe0bb7b880503e22
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Jun 12 06:27:02 2024 +0530
KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote
storage (#15820)
- Added the integration of the quota manager to throttle copy requests to
the remote storage. Reference KIP-956
- Added unit-tests for the copy throttling logic.
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>, Kamal Chandraprakash<[email protected]>
---
checkstyle/import-control-core.xml | 1 +
.../java/kafka/log/remote/RemoteLogManager.java | 29 +++
.../kafka/log/remote/RemoteLogManagerTest.java | 210 +++++++++++++++++++++
3 files changed, 240 insertions(+)
diff --git a/checkstyle/import-control-core.xml
b/checkstyle/import-control-core.xml
index ed6c53a322b..a30de55e415 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -38,6 +38,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
<allow pkg="org.apache.kafka.server"/>
+ <allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the
global default yammer metrics registry
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable
-->
<disallow class="com.yammer.metrics.Metrics" />
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 43c03190767..b920a962afc 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -97,6 +97,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -123,6 +124,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable {
private final RemoteLogMetadataManager remoteLogMetadataManager;
+ private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
+ private final Condition copyQuotaManagerLockCondition =
copyQuotaManagerLock.newCondition();
private final RLMQuotaManager rlmCopyQuotaManager;
private final RLMQuotaManager rlmFetchQuotaManager;
@@ -250,6 +255,13 @@ public class RemoteLogManager implements Closeable {
remoteStorageReaderThreadPool.removeMetrics();
}
+ /**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be
available
+ */
+ Duration quotaTimeout() {
+ return Duration.ofSeconds(1);
+ }
+
RLMQuotaManager createRLMCopyQuotaManager() {
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics,
QuotaType.RLMCopy$.MODULE$,
"Tracking copy byte-rate for Remote Log Manager", time);
@@ -763,6 +775,23 @@ public class RemoteLogManager implements Closeable {
isCancelled(), isLeader());
return;
}
+
+ copyQuotaManagerLock.lock();
+ try {
+ while (rlmCopyQuotaManager.isQuotaExceeded()) {
+ logger.debug("Quota exceeded for copying
log segments, waiting for the quota to be available.");
+ // If the thread gets interrupted while
waiting, the InterruptedException is thrown
+ // back to the caller. It's important to
note that the task being executed is already
+ // cancelled before the executing thread
is interrupted. The caller is responsible
+ // for handling the exception gracefully
by checking if the task is already cancelled.
+ boolean ignored =
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(),
TimeUnit.MILLISECONDS);
+ }
+
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
+ // Signal waiting threads to check the quota
again
+ copyQuotaManagerLockCondition.signalAll();
+ } finally {
+ copyQuotaManagerLock.unlock();
+ }
copyLogSegment(log,
candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
}
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 83fc5966b49..4c4976f060d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -21,6 +21,7 @@ import com.yammer.metrics.core.MetricName;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
@@ -87,6 +88,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
+import org.opentest4j.AssertionFailedError;
import scala.Option;
import scala.collection.JavaConverters;
@@ -101,6 +103,7 @@ import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -146,6 +149,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -153,6 +157,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
@@ -187,6 +192,7 @@ public class RemoteLogManagerTest {
private final RemoteStorageManager remoteStorageManager =
mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager =
mock(RemoteLogMetadataManager.class);
+ private final RLMQuotaManager rlmCopyQuotaManager =
mock(RLMQuotaManager.class);
private RemoteLogManagerConfig remoteLogManagerConfig = null;
private BrokerTopicStats brokerTopicStats = null;
@@ -230,6 +236,12 @@ public class RemoteLogManagerTest {
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
+ public RLMQuotaManager createRLMCopyQuotaManager() {
+ return rlmCopyQuotaManager;
+ }
+ public Duration quotaTimeout() {
+ return Duration.ofMillis(100);
+ }
@Override
long findLogStartOffset(TopicIdPartition topicIdPartition,
UnifiedLog log) {
return 0L;
@@ -2735,6 +2747,204 @@ public class RemoteLogManagerTest {
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testCopyQuota(boolean quotaExceeded) throws Exception {
+ RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+
+ if (quotaExceeded) {
+ // Verify that the copy operation times out, since no segments can
be copied due to quota being exceeded
+ assertThrows(AssertionFailedError.class, () ->
assertTimeoutPreemptively(Duration.ofMillis(200), () ->
task.copyLogSegmentsToRemote(mockLog)));
+
+ // Verify the highest offset in remote storage is updated only once
+ ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+ verify(mockLog,
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+ // Verify the highest offset in remote storage was -1L before the
copy started
+ assertEquals(-1L, capture.getValue());
+ } else {
+ // Verify the copy operation completes within the timeout, since
it does not need to wait for quota availability
+ assertTimeoutPreemptively(Duration.ofMillis(100), () ->
task.copyLogSegmentsToRemote(mockLog));
+
+ // Verify quota check was performed
+ verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+ // Verify bytes to copy was recorded with the quota manager
+ verify(rlmCopyQuotaManager, times(1)).record(10);
+
+ // Verify the highest offset in remote storage is updated
+ ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+ verify(mockLog,
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+ List<Long> capturedValues = capture.getAllValues();
+ // Verify the highest offset in remote storage was -1L before the
copy
+ assertEquals(-1L, capturedValues.get(0).longValue());
+ // Verify it was updated to 149L after the copy
+ assertEquals(149L, capturedValues.get(1).longValue());
+ }
+ }
+
+ @Test
+ public void testRLMShutdownDuringQuotaExceededScenario() throws Exception {
+ remoteLogManager.startup();
+ setupRLMTask(true);
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
+ // Ensure the copy operation is waiting for quota to be available
+ TestUtils.waitForCondition(() -> {
+ verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
+ return true;
+ }, "Quota exceeded check did not happen");
+ // Verify RLM is able to shut down
+ assertTimeoutPreemptively(Duration.ofMillis(100), () ->
remoteLogManager.close());
+ }
+
+ // helper method to set up a RemoteLogManager.RLMTask for testing copy
quota behaviour
+ private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded)
throws RemoteStorageException, IOException {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ when(mockLog.parentDir()).thenReturn("dir1");
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ File tempFile = TestUtils.tempFile();
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+
+ // Set up the segment that is eligible for copy
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ // set up the active segment
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ File tempDir = TestUtils.tempDirectory();
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+ when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+ doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(2);
+ return task;
+ }
+
+ @Test
+ public void testCopyThrottling() throws Exception {
+ long oldestSegmentStartOffset = 0L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
+
+ // create 3 log segments
+ LogSegment segmentToCopy = mock(LogSegment.class);
+ LogSegment segmentToThrottle = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ File tempFile = TestUtils.tempFile();
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+
+ // set up the segment that will be copied
+ when(segmentToCopy.log()).thenReturn(fileRecords);
+ when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
+ when(segmentToCopy.readNextOffset()).thenReturn(100L);
+
+ // set up the segment that will not be copied because of hitting quota
+ when(segmentToThrottle.log()).thenReturn(fileRecords);
+ when(segmentToThrottle.baseOffset()).thenReturn(100L);
+ when(segmentToThrottle.readNextOffset()).thenReturn(150L);
+
+ // set up the active segment
+ when(activeSegment.log()).thenReturn(fileRecords);
+ when(activeSegment.baseOffset()).thenReturn(150L);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy,
segmentToThrottle, activeSegment)));
+
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ File tempDir = TestUtils.tempDirectory();
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset,
""), oldestSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldestSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldestSegmentStartOffset, txnFile);
+ when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
+ when(segmentToCopy.offsetIndex()).thenReturn(idx);
+ when(segmentToCopy.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+ // After the first call, isQuotaExceeded should return true
+ when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
+ doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ task.convertToLeader(2);
+
+ // Verify that the copy operation times out, since the second segment
cannot be copied due to quota being exceeded
+ assertThrows(AssertionFailedError.class, () ->
assertTimeoutPreemptively(Duration.ofMillis(200), () ->
task.copyLogSegmentsToRemote(mockLog)));
+
+ // Verify the highest offset in remote storage is updated
corresponding to the only segment that was copied
+ ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+ verify(mockLog,
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+ List<Long> capturedValues = capture.getAllValues();
+ // Verify the highest offset in remote storage was -1L before the copy
+ assertEquals(-1L, capturedValues.get(0).longValue());
+ // Verify it was updated to 99L after the copy
+ assertEquals(99L, capturedValues.get(1).longValue());
+ }
+
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);