We do have a team member working on that right now. I'm not totally sure of the
time frame on getting it out to open source, but I would suspect a week or two
more at the most.
- Bobby
On Friday, May 29, 2015 7:56 AM, Ryan Persaud <[email protected]>
wrote:
Hello Raghavendra,
Thanks for your reply and code sample. I see that you are using the List
version of the send function in kafka.javaapi.producer to implement
batching. My Scala is not so good, but it looks like the List version of
send() is just iterating through the list and calling the same function as
the single message version (see below), so it's not clear to me that any
batching is occurring in the Producer itself. Am I reading this code
incorrectly?
From:
https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/javaapi/producer/Producer.scala
/** * Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer * @param producerData the
producer data object that encapsulates the topic, key and message data */
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) { import
collection.JavaConversions._ underlying.send(new kafka.producer.ProducerData
[K,V](producerData.getTopic, producerData.getKey,
asBuffer(producerData.getData))) } /** * Use this API to send data to
multiple topics * @param producerData list of producer data objects that
encapsulate the topic, key and message data */ def send(producerData:
java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) { import
collection.JavaConversions._ underlying.send(asBuffer(producerData).map(pd
=> new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
asBuffer(pd.getData))): _*) }
-Ryan
On Fri, May 29, 2015 at 1:29 AM, Raghavendra Nandagopal <
[email protected]> wrote:
> Hi Ryan,
> I have attached a sample implementation of KafkaBolt that does micro
> batching on the tuples. Also it does take care of tuples that would be
> acked for successful write to Kafka and fails in case of any exception.
> The frequency of tuple batching is based on the tick tuples configured
> in seconds or the batch size which is configurable number. For e.g. to set
> the frequency of 10 seconds below is the configuration.
>
> conf.put("topology.tick.tuple.freq.secs", 10);
>
> The batch size is hardcoded in the attached file.
>
> Thanks,
> Raghavendra Nandagopal
>
>
> On Thu, May 28, 2015 at 7:43 PM, Ryan Persaud <[email protected]>
> wrote:
>
>> Hello,
>>
>> I was wondering if anyone is aware of an implementation of the KafkaBolt
>> that uses the new Java Kafka Producer instead of the legacy Scala one. I
>> have a topology with a high volume of tuples that need to be written to
>> Kafka, so I would like to take advantage of the batching that the
>> asynchronous mode provides. However, I also want to only ack tuples after
>> I have confirmation that they have been successfully written to at least
>> one Kafka broker. Since the Scala producer does not support callbacks, I
>> think I would need to use the new Java producer to realize the desired
>> functionality.
>>
>> Thoughts?
>>
>> Thanks,
>> -Ryan
>>
>
>