Hey Gaurav, Samza automatically keeps track of the offsets your job has successfully processed for each SSP. When your task requests a checkpoint, Samza will write the offset of the latest successfully-processed message for each SSP that task consumes.
So if task0 consumes partition 0 of two topics (configuration, metrics) and has successfully processed: * offset 3 in the configuration topic, partition 0 * offset 7 in the metrics topic, partition 0 Then Samza will write a checkpoint that looks something like this task0 -> {"SystemStreamPartition [kafka, configuration, 0]":{"system":"kafka","partition":"0","offset":"3","stream":"configuration"},"SystemStreamPartition [kafka, metrics, 0]":{"system":"kafka","partition":"0","offset":"7","stream":"metrics"}} Where "kafka" was an arbitrarily-chosen system name. So while you can't explicitly set the checkpoint offset on a per SSP basis, the checkpoints are actually recorded that way. And if your job consumes multiple topics, the offsets will be granular enough, because they're per-SSP. More details on checkpointing here: http://samza.apache.org/learn/documentation/0.13/container/checkpointing.html -Jake On Wed, Nov 1, 2017 at 5:48 AM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Thanks, I'll check it out. > > I have a samza application that is consuming a lot of different types of > messages (these messages are related to each other but do not require join > - think of these like different configuration and metric information of > virtual machines that modify some central sates like databases, timeseries > stores etc). We have used a single KafkaTopic so far with partitions for > parallelism. > > Now, there is a message type (metrics) for which I want to perform larger > "batching" for cost reasons. > > Hence I was exploring ways in which I can put those messages on a separate > Kafka Topic but use the same samza application that we have been using so > far, instead of creating a new one. There is some state (caches etc) that > are shared between messages and hence it will be wasteful to launch an > independent application. > > If I could control the checkpointing per topic independently, this approach > could work. > > Please let me know if this sounds like a reasonable approach for this? > > On Sat, Oct 28, 2017 at 8:41 PM, Jagadish Venkatraman < > jagadish1...@gmail.com> wrote: > > > In Samza, the logical unit of processing (and hence, checkpointing) is a > > task. Hence, you cannot selectively checkpoint SSPs within a task. > > > > However, you can configure how you group your SSPs into tasks by choosing > > a Grouper. If you want to control checkpointing at the granularity of an > > SSP, then you can choose the org.apache.samza.container.grouper.stream. > > GroupBySystemStreamPartitionFactory. > > > > Config reference: https://samza.apache.org/ > learn/documentation/0.10/jobs/ > > configuration-table.html > > > > What are you trying to do? Maybe, there's a simpler way to achieve it? > > > > > > > > On Sat, Oct 28, 2017 at 4:09 AM, Gaurav Agarwal < > gauravagarw...@gmail.com> > > wrote: > > > >> Hi All, > >> > >> If I had Samza Tasks that were consuming message from multiple topics, > >> how would checkpoint/commit work in that case? On calling > >> taskCordinator.commit(), would current offset of all topics be saved for > >> the caller task (only the partitions assigned to the caller task)? Is > >> there a way to control this behavior more granularly where I can request > >> samza to commit the offset for only a given task/topic combination only? > >> > >> -- > >> thanks, > >> gaurav > >> > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > >