Thank you Rion for working on this, I think it was missed for some unknown (I 
don’t recall why...) reasons.

I’ll take a look on your PR.

Alexey

> On 12 Feb 2021, at 03:48, Rion Williams <[email protected]> wrote:
> 
> Hi all,
> 
> Recently, I encountered a bit of functionality for a pipeline that I was 
> working that seemed to be slightly lacking (specifically the recognition of 
> explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I 
> put together a JIRA related to it [1] as well as a more detailed pull request 
> [1] with an initial potential fix/change.
> 
> I'll provide a bit more context from the pull request description below in 
> case in-thread feedback would be easier for some, but any 
> recommendations/reviewers/advice would be greatly appreciated!
> 
> Cheers,
> 
> Rion
> 
> [1]: https://issues.apache.org/jira/browse/BEAM-11806 
> <https://issues.apache.org/jira/browse/BEAM-11806>
> [2]: https://github.com/apache/beam/pull/13975 
> <https://github.com/apache/beam/pull/13975>
> 
> ----------------------------------
> 
> At present, the WriteRecords transform for KafkaIO does not recognize the 
> partition property defined on ProducerRecord instances consumed by the 
> transform:
> 
> producer.send(
>         // The null property in the following constructor represents partition
>         new ProducerRecord<>(
>             topicName, null, timestampMillis, record.key(), record.value(), 
> record.headers()),
>         new SendCallback());
> Because of this limitation, in a scenario where a user may desire an 
> explicitly defined partitioning strategy as opposed to round-robin, they 
> would have to create their own custom DoFn that defines a KafkaProducer 
> (preferably within a @StartBundle) similar to the following approach (in 
> Kotlin):
> 
> private class ExampleProducerDoFn(...): DoFn<...>() {
>         private lateinit var producer: KafkaProducer<...>
> 
>         @StartBundle
>         fun startBundle(context: StartBundleContext) {
>             val options = 
> context.pipelineOptions.`as`(YourPipelineOptions::class.java)
>             producer = getKafkaProducer(options)
>         }
> 
>         @ProcessElement
>         fun processElement(context: ProcessContext){
>             // Omitted for brevity
>             
>             // Produce the record to a specific topic at a specific partition
>             producer.send(ProducerRecord(
>                 "your_topic_here",
>                 your_partition_here,
>                 context.element().kv.key,
>                 context.element().kv.value
>             ))
>         }
> The initial pull request that I threw in here simply replaces the existing 
> null with the record.partition() (i.e. the record that was explicitly defined 
> initially, but it may require some other changes which I'd need someone more 
> familiar with the KafkaIO source to chime in on.
> 
> 

Reply via email to