Hi, Chen, So, is your goal to improve the throughput to the changelog topic or reduce the size of the changelog topic? If you are targeting for later and your KV-store truly is of the size of the input log, I don't see how it is possible. In a lot of use cases, users will only need to retain the *recent* certain time period of input log. In that case, you can choose to periodically purge the expired records in KV-store to reduce the size (both for the KV-store and the changelog).
Regards, -Yi On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <chen.song...@gmail.com> wrote: > Thanks Yan. > > Very good explanation on 1). > > For 2), I understand that users can tune the size of the batch for Kafka > producer. However, that doesn't change the number of messages sent to the > changelog topic. In our case, we process a high volume log (1.5MM > records/second) will update kv store for each message and this will result > the changelog to grow to the same size of input log. Even with compaction > turned on changelog, it is not very scalable. I am wondering if there is a > way to mitigate this problem. > > > On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > Hi Chen Song, > > > > There are two different concepts: *checkpoint* and *changelog*. > Checkpoint > > is for the offset of the messages, while the changelog is for the > kv-store. > > The code snippet you show is for the checkpoint , not for the changelog. > > > > {quote} > > 1. When implementing our Samza task, does each call of process method > > triggers a call to TaskInstance.commit? > > {quote} > > > > TaskInstance.commit triggers the *checkpoint* . It is triggered every > > task.commit.ms , (default is 60000ms). The code is here > > < > > > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166 > > > > > . Basically, the RunLoop class calls the commit method, but only trigger > > the commit behavior every configured time. > > > > If you are talking about the *changelog*, it's not controlled by the > commit > > method. Instead, every put/delete calls the "send > > < > > > https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51 > > >" > > of the system Producer. (code is here > > < > > > https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66 > > >). > > In terms of how often the "send" really *send *to the broker (e.g. > kafka), > > it depends on your producer's configuration. For example, in Kafka, you > can > > have the producer send a batch (setting async), or send one msg a time > > (setting sync). What it means is that, it leaves the System to decide how > > to deal with the "send" method. > > > > > > {quote} > > 2. Is there a way to buffer these commit activities in memory and flush > > periodically? Our job is joining >1mm messages per second using a KV > store > > and we have a lot of concern for the changelog size, as in the worst > case, > > the change log will grow as fast as the input log. > > {quote} > > > > If you are talking about the checkpoint, you can change the > task.commit.ms > > . > > > > If you are thinking of the changelog (kv-store), you can change the > > producer's config to batch a few changes and send to the broker. > > > > I think the guys in the community with more operational experience are > able > > to tell you what is the best practice. > > > > Thanks, > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com> > wrote: > > > > > We are trying to understand the order of commits when processing each > > > message in a Samza job. > > > > > > T1: input offset commit > > > T2: changelog commit > > > T3: output commit > > > > > > By looking at the code snippet in > > > > > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171 > > > , > > > my understanding is that for each input message, Samza always send > update > > > message on changelog, send the output message and then commit the input > > > offset. It makes sense to me at the high level in terms of at least > once > > > processing. > > > > > > Specifically, we have two dumb questions: > > > > > > 1. When implementing our Samza task, does each call of process method > > > triggers a call to TaskInstance.commit? > > > 2. Is there a way to buffer these commit activities in memory and flush > > > periodically? Our job is joining >1mm messages per second using a KV > > store > > > and we have a lot of concern for the changelog size, as in the worst > > case, > > > the change log will grow as fast as the input log. > > > > > > Chen > > > > > > -- > > > Chen Song > > > > > > > > > -- > Chen Song >