Hey Richard, > It’s sort of ‘by convention’ that they are documented in samza as matching exactly what kafka expects, no?
Samza's KafkaSystem has three prefixes: systems.<system name>.samza.* systems.<system name>.consumer.* systems.<system name>.producer.* The producer.* configs are passed directly to the underlying Kafka producer (stripped of their prefixes). Therefore, you can directly use any Kafka producer setting (with the prefix above), and it will be given to the Kafka producer. This lets you set things like batch size, compression, etc. Samza just acts as a pass-through. The consumer is slightly more complicated because all of the consumer configs listed in the Kafka docs are for their "high-level" (ZK-based) consumer, which we don't use. We still try to honor the ones we can, though. See KafkaConfig and KafkaSystemFactory for details there. Cheers, Chris On Thu, Feb 19, 2015 at 4:44 PM, Richard Lee <rd...@tivo.com> wrote: > Ah. Yes, I read the KafkaSystemProducer code, but did not dive deeply > enough down into the actual Kafka library implementation itself to see > where these parameters are used. It’s sort of ‘by convention’ that they > are documented in samza as matching exactly what kafka expects, no? > > Richard > > > On Feb 19, 2015, at 4:13 PM, Navina Ramesh <nram...@linkedin.com.INVALID> > wrote: > > > > Hi Richard, > > SystemProducer is an abstraction for producers. If you are specifically > > looking for Kafka as a system, you should be looking into > > KafkaSystemProducer. > > Depending on the Samza version you are using, you should be able to see > > the code related to buffering. > > > > If you check the RunLoop class, you will see that the run method calls in > > order - process, window, commit - methods for every task instance. The > > commit method for a TaskInstance performs the flush operation for the > > stores and producers used in the Task and then, finally commits the > > checkpoint to Kafka (in this case). > > > > Checkpointing is synchronous, if you use KafkaCheckpointManager. In that > > implementation, you can take a look at the ³writeLog² method that makes a > > synchronous call to the KafkaProducer. > > > > Hope this helps! > > > > Cheers, > > Navina > > > > On 2/19/15, 3:59 PM, "Richard Lee" <rd...@tivo.com> wrote: > > > >> > >> > >>> On Feb 19, 2015, at 3:46 PM, Richard Lee <rd...@tivo.com> wrote: > >>> > >>> I¹m looking at the samza-core source codeŠ in particular RunLoop, > >>> TaskInstance, TaskInstanceCollector, and SystemProducers, and I¹m > having > >>> a hard time seeing where this batched sends of output messages happens. > >>> It seems from RunLoop that: > >>> > >>> - one input message envelope is read from the consumer multiplexer > >>> - it is sent to the task for processing > >>> - if the task writes a new output message envelope to the collector, > >>> the TaskInstanceCollector immediately looks up the SystemProducer from > >>> SystemProducers and immediately sends it onwards. > >>> > >>> I don¹t see any consultation of either producer.type or batch size. > >>> > >>> However, I am also new to scala, so I¹m likely not understanding > >>> something fairly obvious. > >> > >> I should add, that the rest of the RunLoop, after processing the one > >> input message envelope, then calls > >> > >> - window (which is a no-op if not doing windowing, AFAICT) > >> - commit, which, if the commit timer has expired > >> - tells all the tasks to commit > >> - which flushes all their collectors > >> - which flushes the SystemProducers > >> - which flushes each SystemProducer > >> - which guarantees that everything is written out. > >> > >> So, it naively appears that the core loop is > >> > >> - read one message > >> - flush all outputs > >> > >> I know there must be some magic somewhere that is buffering & batching, > >> but I¹m having trouble finding it. > >> > >> Richard > >> > >>> > >>> Richard > >>> > >>> > >>>> On Feb 19, 2015, at 8:39 AM, Chris Riccomini <criccom...@apache.org> > >>>> wrote: > >>>> > >>>> Hey Tom, > >>>> > >>>> It seems that most of your questions are concerned with durability and > >>>> messaging guarantees. Samza is designed to not lose data, but > >>>> duplicates > >>>> can occur. Samza reads messages, and feeds them to your process() > >>>> method. > >>>> When you send messages, either via a changelog, or via collector.send, > >>>> Samza will batch those messages up, and send them at some point BEFORE > >>>> your > >>>> input offsets are committed. This looks like: > >>>> > >>>> <start>, ... process and send a lot ..., <commit> > >>>> > >>>> Samza only guarantees that everything will be flushed to Kafka (or > >>>> whatever > >>>> output system you're sending to) *before* committing offsets. Once > >>>> offsets > >>>> are committed, you'll never see any prior messages again. If a failure > >>>> occurs somewhere *before* the offsets are committed, you'll simply > fall > >>>> back to the last checkpointed offsets (<start>) and restart the > >>>> processing > >>>> again. > >>>> > >>>> In between, for performance reasons, Samza batches output, delays > >>>> sends, > >>>> etc. This is safe because we always flush before committing. > >>>> > >>>>> a) If using RockDB kv implementation, is there a way to guarantee > >>>>> that a > >>>> put is committed (at least on that instance disc), I notice that > RockDB > >>>> implementation does nothing for kv.flush(). > >>>> > >>>> The RocksDB store in Samza is basically used as a durable cache. The > >>>> only > >>>> guarantee that Samza really cares about is whether it can get the data > >>>> after it's been put (whether the data is still in memory, or on disk). > >>>> The > >>>> guarantee you, as a user, probably care about is whether your write > has > >>>> been sent to your changelog. > >>>> > >>>>> b) When is it guaranteed that the kv put is in the change log (I am > >>>>> using > >>>> kafka implementation). > >>>> > >>>> It will be guaranteed to be written to the changelog when commit() is > >>>> called, before your offsets are committed. The exact order of commit > >>>> is: > >>>> flush storage changelogs, flush producers, commit offsets. You can see > >>>> this > >>>> in RunLoop.scala. This guarantees that your changelogs will be fully > >>>> flushed to Kafka before you commit your offsets. If a failure occurs > >>>> before > >>>> the offset commit, you'd see duplicate messages, but you'd never lose > >>>> messages. > >>>> > >>>>> When using messageCollector.send and > >>>>> systems.kafka.producer.producer.type=sync > >>>> does that guarantee that the message is in kafka log when the send > >>>> returns. > >>>> > >>>> Note quite. Samza batches messages to increase throughput. 'sync' > tells > >>>> Samza to block when a *batch* of messages is being sent. If you wanted > >>>> to > >>>> synchronously write each message, and block, you'd have to set the > >>>> batch > >>>> size to 1. > >>>> > >>>>> If my Samza job fails while processing a message, I fix it and deploy > >>>> again, will the message offset still point to a value <= the message I > >>>> failed on. > >>>> > >>>> Yes. It should never be higher until the commit() message is called > >>>> (after > >>>> process()). The guarantee Samza provides is that you might see > >>>> duplicates, > >>>> but you'll not lose data. > >>>> > >>>> Cheers, > >>>> Chris > >>>> > >>>> On Thu, Feb 19, 2015 at 8:23 AM, Tom Dearman <tom.dear...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> Can someone help with the following questions please: > >>>>> > >>>>> a) If using RockDB kv implementation, is there a way to guarantee > >>>>> that a > >>>>> put is committed (at least on that instance disc), I notice that > >>>>> RockDB > >>>>> implementation does nothing for kv.flush(). > >>>>> > >>>>> b) When is it guaranteed that the kv put is in the change log (I am > >>>>> using > >>>>> kafka implementation). > >>>>> > >>>>> c) When using messageCollector.send and > >>>>> systems.kafka.producer.producer.type=sync does that guarantee that > the > >>>>> message is in kafka log when the send returns. I am new to kafka, > >>>>> but it > >>>>> seems to me that if you have type=sync set, you still need to wait > >>>>> for the > >>>>> future objects get to return, is this what Samza does? > >>>>> > >>>>> d) If my Samza job fails while processing a message, I fix it and > >>>>> deploy > >>>>> again, will the message offset still point to a value <= the message > I > >>>>> failed on. ie I understand it can be earlier, but is it possible the > >>>>> offset will now point to one higher. > >>> > >>> > >>> ________________________________ > >>> > >>> This email and any attachments may contain confidential and privileged > >>> material for the sole use of the intended recipient. Any review, > >>> copying, or distribution of this email (or any attachments) by others > is > >>> prohibited. If you are not the intended recipient, please contact the > >>> sender immediately and permanently delete this email and any > >>> attachments. No employee or agent of TiVo Inc. is authorized to > conclude > >>> any binding agreement on behalf of TiVo Inc. by email. Binding > >>> agreements with TiVo Inc. may only be made by a signed written > agreement. > >> > >> > >> ________________________________ > >> > >> This email and any attachments may contain confidential and privileged > >> material for the sole use of the intended recipient. Any review, > copying, > >> or distribution of this email (or any attachments) by others is > >> prohibited. If you are not the intended recipient, please contact the > >> sender immediately and permanently delete this email and any > attachments. > >> No employee or agent of TiVo Inc. is authorized to conclude any binding > >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > >> Inc. may only be made by a signed written agreement. > > > > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. >