Comments inline On Tue, Nov 28, 2023 at 3:50 PM Luke Chen <show...@gmail.com> wrote: > > Hi Abhijeet, > > Thanks for the KIP! > This is an important feature for tiered storage. > > Some comments: > 1. Will we introduce new metrics for this tiered storage quotas? > This is important because the admin can know the throttling status by > checking the metrics while the remote write/read are slow, like the rate of > uploading/reading byte rate, the throttled time for upload/read... etc. >
Good point. When the fetches/copies on a broker are getting throttled, we'll notice that the broker's fetch and copy activities stabilize according to the predefined limit. We plan to add the following extra metrics to indicate the throttled time for fetches/copies, similar to the local fetch throttle metrics. *RemoteFetchThrottleTime* - The amount of time needed to bring the observed remote fetch rate within the read quota *RemoteCopyThrottleTime *- The amount of time needed to bring the observed remote copy rate with the copy quota. > 2. Could you give some examples for the throttling algorithm in the KIP to > explain it? That will make it much clearer. > The QuotaManagers look something like: class RlmWriteQuotaManager { public void updateQuota(Quota newQuota) {// Implementation} public boolean isQuotaExceeded() {// Implementation} public void record(Double value) {// Implementation} } class RlmReadQuotaManager { public void updateQuota(Quota newQuota) {// Implementation} public boolean isQuotaExceeded() {// Implementation} public void record(Double value) {// Implementation} } *Read Throttling* RemoteLogReader will capture the read rate once it has fetched the remote log segment to serve the fetch request. Changes would look something like below. class RemoteLogReader { @Override public Void call() { // ... quotaManager.record(result.fetchDataInfo.map(info -> (double) info.records.sizeInBytes()).orElse(0.0)); callback.accept(result); } } Read throttling will be carried out by the ReplicaManager. When it gets offset out of range error, it will check for quota getting exhausted before returning delayedRemoteStorageFetch info. def handleOffsetOutOfRangeError() { // ... if (params.isFromFollower) { // ... } else { // ... val fetchDataInfo = if (isRemoteLogReadQuotaExceeded) { info("Read quota exceeded. Returning empty remote storage fetch info") FetchDataInfo( LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY, delayedRemoteStorageFetch = None ) } else { // Regular case when quota is not exceeded FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset), MemoryRecords.EMPTY, delayedRemoteStorageFetch = Some( RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, fetchIsolation))) } //... } When the read quota is exceeded, we return an empty remote storage fetch info. We do not want to send an exception in the LogReadResult response (like we do in other cases when we send UnknownOffsetMetadata), because then it is classified as an error in reading data, and a response is immediately sent back to the client. Instead, we want that we should be able to serve data for other topic partitions in the fetch request via delayed fetch if required (when sending an immediate response, delayed fetch is skipped). Also, immediately sending a response would make the consumer retry again immediately, which may run into quota exceeded situation again and thus get it into a loop. *Write Throttling* RemoteLogManager.java private ReentrantLock writeQuotaManagerLock = new ReentrantLock(true); private Condition lockCondition = writeQuotaManagerLock.newCondition(); private void copyLogSegment { // ... writeQuotaManagerLock.lock(); try { while (rlmWriteQuotaManager.isQuotaExceeded) { // Quota exceeded, waiting for quota to be available lockCondition.await(timeout, TimeUnit.MILLISECONDS); } rlmWriteQuotamanager.record(segment.log.sizeInBytes()) // Signal waiting threads to check the quota locakCondition.signalAll() } finally { writeQuotaManagerLock.unlock(); } // Actual copy operation } Before beginning to copy the segment to remote, the RLMTask checks if the quota is exceeded. If it has, the task waits for a specified period before rechecking the quota status. Once the RLMTask confirms that the quota is available, it records the size of the log it plans to upload with the quota manager before proceeding with the segment upload. This step is crucial to ensure that the quota is accurately updated and prevents other RLMTasks from mistakenly assuming that quota space is available for them as well. > 3. To solve this problem, we can break down the RLMTask into two smaller > tasks - one for segment upload and the other for handling expired segments. > How do we handle the situation when a segment is still waiting for > offloading while this segment is expired and eligible to be deleted? > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > just check it each time the RLMTask runs? > The concern here is that this may cause starvation from some topic partitions. RLMTasks are added to the thread pool executor to run with a specific schedule (every 30 secs). If we do not block the RLMTask when the quota is exceeded and instead skip the run, RLMTask for certain topic partitions may never run, because every time it is scheduled to run, the broker-level upload quota at that instant may have already been exhausted Breaking down the RLMTask into smaller tasks is already being proposed in KIP-950. The two tasks will need some coordination at a topic-partition level to handle such cases as pointed out. For this particular case, the upload task could skip uploading the segment if it is already eligible for deletion. > Thank you. > Luke > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <abhijeet.cse....@gmail.com > > wrote: > > > Hi All, > > > > I have created KIP-956 for defining read and write quota for tiered > > storage. > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > Feedback and suggestions are welcome. > > > > Regards, > > Abhijeet. > > -- Abhijeet.