[ 
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Kleppmann updated SAMZA-179:
-----------------------------------

    Attachment: SAMZA-179-v1.patch

Here is a first attempt -- I expect it will need some refinement: 
https://reviews.apache.org/r/19384/

You can use it like this:

{code:java}
public class FooTask implements StreamTask, InitableTask {
  private String newestOffset;

  public void init(Config config, TaskContext context) {
    this.newestOffset = context.getStreamMetadata(new SystemStream("kafka", 
"input-topic")).getNewestOffset();
  }

  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
    if (envelope.getOffset().equals(newestOffset)) {
      coordinator.shutdown(false);
    }
  }
}
{code}

I thought the approach of exposing stream metadata to the task was more 
flexible than just a notification when it has caught up, and doesn't really 
make the task any more complex.

Some discussion points:

* Should TaskContext.getStreamMetadata return the current metadata from the 
broker, or just the version of the metadata that was fetched when the job was 
initialized? BootstrappingChooser just uses the version from job startup, but I 
thought that people might use the stream metadata for other things too, and it 
would be really confusing if it's not up-to-date.
* Is it ok for TaskContext.getStreamMetadata to just call straight through to 
SystemAdmin.getSystemStreamMetadata? It's not ideal, as 
TaskContext.getStreamMetadata gets called for each task — however, thanks to 
TopicMetadataCache, we don't actually contact the broker for each task.
* I agree that shutdown should wait for all tasks to be caught up, so I've 
added a boolean parameter to the shutdown method. force=true keeps the current 
semantics, force=false waits until all tasks have requested shutdown before 
actually doing it. A task may continue processing messages, even after it has 
requested shutdown (can't really do anything else -- can't throw them away!).

Happy to make changes if you disagree with those choices. What do you think?

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch
>
>
> There is currently no easy way for a task to process all messages from some 
> offset all the way to the head of a stream, and then shutdown. This behavior 
> is useful in cases where a Samza job is re-processing old data, or for Samza 
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream 
> metadata required to do these operations. We should expose this to the tasks 
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) 
> that has a callback that's triggered whenever the last message processed by a 
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the 
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through 
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently 
> immediately shuts down the task. There is an argument to be made that it 
> should just mean "cease to process any messages for this task instance", and 
> SamzaContainer should only be shutdown when ALL task instances have called 
> coordinator.shutdown (or there is an error). This would be useful in 
> situations where a StreamTask wishes to shutdown when it's fully caught up to 
> head across all partitions. Right now, the first task instance to call 
> shutdown (the first one caught up) would shut down the entire SamzaContainer. 
> This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to