Colt McNealy created KAFKA-15448:
------------------------------------

             Summary: Streams StandbyTaskUpdateListener
                 Key: KAFKA-15448
                 URL: https://issues.apache.org/jira/browse/KAFKA-15448
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Colt McNealy


In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason {
MIGRATED,
PROMOTED;
}
 
/**
* Method called upon the creation of the Standby Task.
*
* @param topicPartition the TopicPartition of the Standby Task.
* @param storeName the name of the store being watched by this Standby Task.
* @param earliestOffset the earliest offset available on the Changelog topic.
* @param startingOffset the offset from which the Standby Task starts watching.
* @param currentEndOffset the current latest offset on the associated changelog 
partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
* Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
*
* This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
* Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
* whole.
*
* If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
* @param topicPartition the TopicPartition containing the values to restore
* @param storeName the name of the store undergoing restoration
* @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
* @param numRestored the total number of records restored in this batch for 
this TopicPartition
* @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
* Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to