Bobby, That's great to hear. When it's released, will there be a notification here, or in some other venue?
Thanks for the information, -Ryan On Fri, May 29, 2015 at 9:39 AM, Bobby Evans <[email protected]> wrote: > We do have a team member working on that right now. I'm not totally sure > of the time frame on getting it out to open source, but I would suspect a > week or two more at the most. > - Bobby > > > > On Friday, May 29, 2015 7:56 AM, Ryan Persaud <[email protected]> > wrote: > > > Hello Raghavendra, > > Thanks for your reply and code sample. I see that you are using the List > version of the send function in kafka.javaapi.producer to implement > batching. My Scala is not so good, but it looks like the List version of > send() is just iterating through the list and calling the same function as > the single message version (see below), so it's not clear to me that any > batching is occurring in the Producer itself. Am I reading this code > incorrectly? > > From: > > https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > /** * Sends the data to a single topic, partitioned by key, using either > the > * synchronous or the asynchronous producer * @param producerData the > producer data object that encapsulates the topic, key and message data */ > def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) { import > collection.JavaConversions._ underlying.send(new > kafka.producer.ProducerData > [K,V](producerData.getTopic, producerData.getKey, > asBuffer(producerData.getData))) } /** * Use this API to send data to > multiple topics * @param producerData list of producer data objects that > encapsulate the topic, key and message data */ def send(producerData: > java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) { import > collection.JavaConversions._ underlying.send(asBuffer(producerData).map(pd > => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey, > asBuffer(pd.getData))): _*) } > > -Ryan > > On Fri, May 29, 2015 at 1:29 AM, Raghavendra Nandagopal < > [email protected]> wrote: > > > Hi Ryan, > > I have attached a sample implementation of KafkaBolt that does micro > > batching on the tuples. Also it does take care of tuples that would be > > acked for successful write to Kafka and fails in case of any exception. > > The frequency of tuple batching is based on the tick tuples configured > > in seconds or the batch size which is configurable number. For e.g. to > set > > the frequency of 10 seconds below is the configuration. > > > > conf.put("topology.tick.tuple.freq.secs", 10); > > > > The batch size is hardcoded in the attached file. > > > > Thanks, > > Raghavendra Nandagopal > > > > > > On Thu, May 28, 2015 at 7:43 PM, Ryan Persaud <[email protected]> > > wrote: > > > >> Hello, > >> > >> I was wondering if anyone is aware of an implementation of the KafkaBolt > >> that uses the new Java Kafka Producer instead of the legacy Scala one. > I > >> have a topology with a high volume of tuples that need to be written to > >> Kafka, so I would like to take advantage of the batching that the > >> asynchronous mode provides. However, I also want to only ack tuples > after > >> I have confirmation that they have been successfully written to at least > >> one Kafka broker. Since the Scala producer does not support callbacks, > I > >> think I would need to use the new Java producer to realize the desired > >> functionality. > >> > >> Thoughts? > >> > >> Thanks, > >> -Ryan > >> > > > > > > > >
