[ 
https://issues.apache.org/jira/browse/BEAM-11806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anonymous updated BEAM-11806:
-----------------------------
    Status: Triage Needed  (was: Resolved)

> KafkaIO - Partition Recognition in WriteRecords
> -----------------------------------------------
>
>                 Key: BEAM-11806
>                 URL: https://issues.apache.org/jira/browse/BEAM-11806
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Rion Williams
>            Assignee: Rion Williams
>            Priority: P2
>             Fix For: 2.29.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> At present, the `WriteRecords` support for the KafkaIO does not recognize the 
> `partition` property defined on `ProducerRecord` instances consumed by the 
> transform. This ticket would added support so that any explicit partitioning 
> that was defined would be acknowledged accordingly while still respecting the 
> default behavior if it was not explicitly included.
> This can be easily identified within the `KafkaWriter` class used behind the 
> scenes in the `WriteRecords` transform:
> {code:java}
> producer.send(
>         // The null property in the following constructor represents partition
>         new ProducerRecord<>(
>             topicName, null, timestampMillis, record.key(), record.value(), 
> record.headers()),
>         new SendCallback());
> {code}
> Because of this limitation, in a scenario where a user may desire an 
> explicitly defined partitioning strategy as opposed to round-robin, they 
> would have to create their own custom DoFn that defines a KafkaProducer 
> (preferably within a @StartBundle) similar to the following approach (in 
> Kotlin):
> {code:java}
> private class ExampleProducerDoFn(...): DoFn<...>() {
>         private lateinit var producer: KafkaProducer<...>
>         @StartBundle
>         fun startBundle(context: StartBundleContext) {
>             val options = 
> context.pipelineOptions.`as`(YourPipelineOptions::class.java)
>             producer = getKafkaProducer(options)
>         }
>         @ProcessElement
>         fun processElement(context: ProcessContext){
>             // Omitted for brevity
>             
>             // Produce the record to a specific topic at a specific partition
>             producer.send(ProducerRecord(
>                 "your_topic_here",
>                 your_partition_here,
>                 context.element().kv.key,
>                 context.element().kv.value
>             ))
>         }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to