Thank you Rion for working on this, I think it was missed for some unknown (I don’t recall why...) reasons.
I’ll take a look on your PR. Alexey > On 12 Feb 2021, at 03:48, Rion Williams <[email protected]> wrote: > > Hi all, > > Recently, I encountered a bit of functionality for a pipeline that I was > working that seemed to be slightly lacking (specifically the recognition of > explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I > put together a JIRA related to it [1] as well as a more detailed pull request > [1] with an initial potential fix/change. > > I'll provide a bit more context from the pull request description below in > case in-thread feedback would be easier for some, but any > recommendations/reviewers/advice would be greatly appreciated! > > Cheers, > > Rion > > [1]: https://issues.apache.org/jira/browse/BEAM-11806 > <https://issues.apache.org/jira/browse/BEAM-11806> > [2]: https://github.com/apache/beam/pull/13975 > <https://github.com/apache/beam/pull/13975> > > ---------------------------------- > > At present, the WriteRecords transform for KafkaIO does not recognize the > partition property defined on ProducerRecord instances consumed by the > transform: > > producer.send( > // The null property in the following constructor represents partition > new ProducerRecord<>( > topicName, null, timestampMillis, record.key(), record.value(), > record.headers()), > new SendCallback()); > Because of this limitation, in a scenario where a user may desire an > explicitly defined partitioning strategy as opposed to round-robin, they > would have to create their own custom DoFn that defines a KafkaProducer > (preferably within a @StartBundle) similar to the following approach (in > Kotlin): > > private class ExampleProducerDoFn(...): DoFn<...>() { > private lateinit var producer: KafkaProducer<...> > > @StartBundle > fun startBundle(context: StartBundleContext) { > val options = > context.pipelineOptions.`as`(YourPipelineOptions::class.java) > producer = getKafkaProducer(options) > } > > @ProcessElement > fun processElement(context: ProcessContext){ > // Omitted for brevity > > // Produce the record to a specific topic at a specific partition > producer.send(ProducerRecord( > "your_topic_here", > your_partition_here, > context.element().kv.key, > context.element().kv.value > )) > } > The initial pull request that I threw in here simply replaces the existing > null with the record.partition() (i.e. the record that was explicitly defined > initially, but it may require some other changes which I'd need someone more > familiar with the KafkaIO source to chime in on. > >
