Comments inline

On Tue, Nov 28, 2023 at 3:50 PM Luke Chen <show...@gmail.com> wrote:
>
> Hi Abhijeet,
>
> Thanks for the KIP!
> This is an important feature for tiered storage.
>
> Some comments:
> 1. Will we introduce new metrics for this tiered storage quotas?
> This is important because the admin can know the throttling status by
> checking the metrics while the remote write/read are slow, like the rate
of
> uploading/reading byte rate, the throttled time for upload/read... etc.
>

Good point. When the fetches/copies on a broker are getting throttled,
we'll notice that the broker's fetch and copy
activities stabilize according to the predefined limit. We plan to add the
following extra metrics to indicate the throttled
time for fetches/copies, similar to the local fetch throttle metrics.

*RemoteFetchThrottleTime* - The amount of time needed to bring the observed
remote fetch rate within the read quota
*RemoteCopyThrottleTime *- The amount of time needed to bring the observed
remote copy rate with the copy quota.

> 2. Could you give some examples for the throttling algorithm in the KIP to
> explain it? That will make it much clearer.
>

The QuotaManagers look something like:

class RlmWriteQuotaManager {
   public void updateQuota(Quota newQuota) {// Implementation}
   public boolean isQuotaExceeded() {// Implementation}
   public void record(Double value) {// Implementation}
}

class RlmReadQuotaManager {
   public void updateQuota(Quota newQuota) {// Implementation}
   public boolean isQuotaExceeded() {// Implementation}
   public void record(Double value) {// Implementation}
}

*Read Throttling*

RemoteLogReader will capture the read rate once it has fetched the remote
log segment to serve the fetch request.
Changes would look something like below.

class RemoteLogReader {
@Override
public Void call() {
   // ...
   quotaManager.record(result.fetchDataInfo.map(info -> (double)
info.records.sizeInBytes()).orElse(0.0));
   callback.accept(result);
  }
}

Read throttling will be carried out by the ReplicaManager. When it gets
offset out of range error, it will check for quota
getting exhausted before returning delayedRemoteStorageFetch info.

def handleOffsetOutOfRangeError()
 {
   // ...
   if (params.isFromFollower) {
   // ...
   } else {
     // ...
     val fetchDataInfo = if (isRemoteLogReadQuotaExceeded) {
     info("Read quota exceeded. Returning empty remote storage fetch info")
     FetchDataInfo(
       LogOffsetMetadata.UnknownOffsetMetadata,
       MemoryRecords.EMPTY,
       delayedRemoteStorageFetch = None
     )
    } else {
      // Regular case when quota is not exceeded
      FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset),
MemoryRecords.EMPTY,
       delayedRemoteStorageFetch = Some(
         RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
fetchInfo, fetchIsolation)))
    }
    //...
}

When the read quota is exceeded, we return an empty remote storage fetch
info. We do not want to send an exception
in the LogReadResult response (like we do in other cases when we send
UnknownOffsetMetadata), because then it is
classified as an error in reading data, and a response is immediately sent
back to the client. Instead, we want that we
should be able to serve data for other topic partitions in the fetch
request via delayed fetch if required (when sending an
immediate response, delayed fetch is skipped). Also, immediately sending a
response would make the consumer retry
again immediately, which may run into quota exceeded situation again and
thus get it into a loop.

*Write Throttling*

RemoteLogManager.java

private ReentrantLock writeQuotaManagerLock = new ReentrantLock(true);
private Condition lockCondition = writeQuotaManagerLock.newCondition();

private void copyLogSegment {
  // ...
  writeQuotaManagerLock.lock();
  try {
    while (rlmWriteQuotaManager.isQuotaExceeded) {
      // Quota exceeded, waiting for quota to be available
      lockCondition.await(timeout, TimeUnit.MILLISECONDS);
    }
    rlmWriteQuotamanager.record(segment.log.sizeInBytes())
    // Signal waiting threads to check the quota
    locakCondition.signalAll()
  } finally {
    writeQuotaManagerLock.unlock();
  }

  // Actual copy operation
}

Before beginning to copy the segment to remote, the RLMTask checks if the
quota is exceeded. If it has, the task waits for

a specified period before rechecking the quota status. Once the RLMTask
confirms that the quota is available, it records

the size of the log it plans to upload with the quota manager before
proceeding with the segment upload. This step is crucial

to ensure that the quota is accurately updated and prevents other RLMTasks
from mistakenly assuming that quota space is

available for them as well.

> 3. To solve this problem, we can break down the RLMTask into two smaller
> tasks - one for segment upload and the other for handling expired
segments.
> How do we handle the situation when a segment is still waiting for
> offloading while this segment is expired and eligible to be deleted?
> Maybe it'll be easier to not block the RLMTask when quota exceeded, and
> just check it each time the RLMTask runs?
>

The concern here is that this may cause starvation from some topic
partitions. RLMTasks are added to the thread pool executor
to run with a specific schedule (every 30 secs). If we do not block the
RLMTask when the quota is exceeded and instead skip
the run, RLMTask for certain topic partitions may never run, because every
time it is scheduled to run, the broker-level upload
quota at that instant may have already been exhausted

Breaking down the RLMTask into smaller tasks is already being proposed in
KIP-950. The two tasks will need some coordination

at a topic-partition level to handle such cases as pointed out. For this
particular case, the upload task could skip uploading the

segment if it is already eligible for deletion.


> Thank you.
> Luke
>
> On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar <abhijeet.cse....@gmail.com
>
> wrote:
>
> > Hi All,
> >
> > I have created KIP-956 for defining read and write quota for tiered
> > storage.
> >
> >
> >
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> >
> > Feedback and suggestions are welcome.
> >
> > Regards,
> > Abhijeet.
> >



-- 
Abhijeet.

Reply via email to