Hey Gaurav,

Samza automatically keeps track of the offsets your job has successfully
processed for each SSP. When your task requests a checkpoint, Samza will
write the offset of the latest successfully-processed message for each SSP
that task consumes.

So if task0 consumes partition 0 of two topics (configuration, metrics) and
has successfully processed:
* offset 3 in the configuration topic, partition 0
* offset 7 in the metrics topic, partition 0

Then Samza will write a checkpoint that looks something like this
task0 -> {"SystemStreamPartition [kafka, configuration,
0]":{"system":"kafka","partition":"0","offset":"3","stream":"configuration"},"SystemStreamPartition
[kafka, metrics,
0]":{"system":"kafka","partition":"0","offset":"7","stream":"metrics"}}

Where "kafka" was an arbitrarily-chosen system name.

So while you can't explicitly set the checkpoint offset on a per SSP basis,
the checkpoints are actually recorded that way. And if your job consumes
multiple topics, the offsets will be granular enough, because they're
per-SSP.

More details on checkpointing here:
http://samza.apache.org/learn/documentation/0.13/container/checkpointing.html

-Jake


On Wed, Nov 1, 2017 at 5:48 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Thanks, I'll check it out.
>
> I have a samza application that is consuming a lot of different types of
> messages (these messages are related to each other but do not require join
> - think of these like different configuration and metric information of
> virtual machines that modify some central sates like databases, timeseries
> stores etc). We have used a single KafkaTopic so far with partitions for
> parallelism.
>
> Now, there is a message type (metrics) for which I want to perform larger
> "batching" for cost reasons.
>
> Hence I was exploring ways in which I can put those messages on a separate
> Kafka Topic but use the same samza application that we have been using so
> far, instead of creating a new one. There is some state (caches etc) that
> are shared between messages and hence it will be wasteful to launch an
> independent application.
>
> If I could control the checkpointing per topic independently, this approach
> could work.
>
> Please let me know if this sounds like a reasonable approach for this?
>
> On Sat, Oct 28, 2017 at 8:41 PM, Jagadish Venkatraman <
> jagadish1...@gmail.com> wrote:
>
> > In Samza, the logical unit of processing (and hence, checkpointing) is a
> > task. Hence, you cannot selectively checkpoint SSPs within a task.
> >
> > However, you can configure how you group your SSPs into tasks by choosing
> > a Grouper. If you want to control checkpointing at the granularity of an
> > SSP, then you can choose the org.apache.samza.container.grouper.stream.
> > GroupBySystemStreamPartitionFactory.
> >
> > Config reference: https://samza.apache.org/
> learn/documentation/0.10/jobs/
> > configuration-table.html
> >
> > What are you trying to do? Maybe, there's a simpler way to achieve it?
> >
> >
> >
> > On Sat, Oct 28, 2017 at 4:09 AM, Gaurav Agarwal <
> gauravagarw...@gmail.com>
> > wrote:
> >
> >> Hi All,
> >>
> >> If I had Samza Tasks that were consuming message from multiple topics,
> >> how would checkpoint/commit work in that case? On calling
> >> taskCordinator.commit(), would current offset of all topics be saved for
> >> the caller task  (only the partitions assigned to the caller task)? Is
> >> there a way to control this behavior more granularly where I can request
> >> samza to commit the offset for only a given task/topic combination only?
> >>
> >> --
> >> thanks,
> >> gaurav
> >>
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
>

Reply via email to