Hi, Chen,

So, is your goal to improve the throughput to the changelog topic or reduce
the size of the changelog topic? If you are targeting for later and your
KV-store truly is of the size of the input log, I don't see how it is
possible. In a lot of use cases, users will only need to retain the
*recent* certain time period of input log. In that case, you can choose to
periodically purge the expired records in KV-store to reduce the size (both
for the KV-store and the changelog).

Regards,
-Yi

On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <chen.song...@gmail.com> wrote:

> Thanks Yan.
>
> Very good explanation on 1).
>
> For 2), I understand that users can tune the size of the batch for Kafka
> producer. However, that doesn't change the number of messages sent to the
> changelog topic. In our case, we process a high volume log  (1.5MM
> records/second) will update kv store for each message and this will result
> the changelog to grow to the same size of input log. Even with compaction
> turned on changelog, it is not very scalable. I am wondering if there is a
> way to mitigate this problem.
>
>
> On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang...@gmail.com> wrote:
>
> > Hi Chen Song,
> >
> > There are two different concepts: *checkpoint* and *changelog*.
> Checkpoint
> > is for the offset of the messages, while the changelog is for the
> kv-store.
> > The code snippet you show is for the checkpoint , not for the changelog.
> >
> > {quote}
> > 1. When implementing our Samza task, does each call of process method
> > triggers a call to TaskInstance.commit?
> > {quote}
> >
> > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > task.commit.ms , (default is 60000ms). The code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > >
> > . Basically, the RunLoop class calls the commit method, but only trigger
> > the commit behavior every configured time.
> >
> > If you are talking about the *changelog*, it's not controlled by the
> commit
> > method. Instead, every put/delete calls the "send
> > <
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > >"
> > of the system Producer. (code is here
> > <
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > >).
> > In terms of how often the "send" really *send *to the broker (e.g.
> kafka),
> > it depends on your producer's configuration. For example, in Kafka, you
> can
> > have the producer send a batch (setting async), or send one msg a time
> > (setting sync). What it means is that, it leaves the System to decide how
> > to deal with the "send" method.
> >
> >
> > {quote}
> > 2. Is there a way to buffer these commit activities in memory and flush
> > periodically? Our job is joining >1mm messages per second using a KV
> store
> > and we have a lot of concern for the changelog size, as in the worst
> case,
> > the change log will grow as fast as the input log.
> > {quote}
> >
> > If you are talking about the checkpoint, you can change the
> task.commit.ms
> > .
> >
> > If you are thinking of the changelog (kv-store), you can change the
> > producer's config to batch a few changes and send to the broker.
> >
> > I think the guys in the community with more operational experience are
> able
> > to tell you what is the best practice.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com>
> wrote:
> >
> > > We are trying to understand the order of commits when processing each
> > > message in a Samza job.
> > >
> > > T1: input offset commit
> > > T2: changelog commit
> > > T3: output commit
> > >
> > > By looking at the code snippet in
> > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > ,
> > > my understanding is that for each input message, Samza always send
> update
> > > message on changelog, send the output message and then commit the input
> > > offset. It makes sense to me at the high level in terms of at least
> once
> > > processing.
> > >
> > > Specifically, we have two dumb questions:
> > >
> > > 1. When implementing our Samza task, does each call of process method
> > > triggers a call to TaskInstance.commit?
> > > 2. Is there a way to buffer these commit activities in memory and flush
> > > periodically? Our job is joining >1mm messages per second using a KV
> > store
> > > and we have a lot of concern for the changelog size, as in the worst
> > case,
> > > the change log will grow as fast as the input log.
> > >
> > > Chen
> > >
> > > --
> > > Chen Song
> > >
> >
>
>
>
> --
> Chen Song
>

Reply via email to