[ 
https://issues.apache.org/jira/browse/BEAM-11806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rion Williams updated BEAM-11806:
---------------------------------
    Description: 
At present, the `WriteRecords` support for the KafkaIO does not recognize the 
`partition` property defined on `ProducerRecord` instances consumed by the 
transform. This ticket would added support so that any explicit partitioning 
that was defined would be acknowledged accordingly while still respecting the 
default behavior if it was not explicitly included.

This can be easily identified within the `KafkaWriter` class used behind the 
scenes in the `WriteRecords` transform:
{code:java}
producer.send(
        // The null property in the following constructor represents partition
        new ProducerRecord<>(
            topicName, null, timestampMillis, record.key(), record.value(), 
record.headers()),
        new SendCallback());
{code}
Because of this limitation, in a scenario where a user may desire an explicitly 
defined partitioning strategy as opposed to round-robin, they would have to 
create their own custom DoFn that defines a KafkaProducer (preferably within a 
@StartBundle) similar to the following approach (in Kotlin):


{code:java}
private class ExampleProducerDoFn(...): DoFn<...>() {
        private lateinit var producer: KafkaProducer<...>

        @StartBundle
        fun startBundle(context: StartBundleContext) {
            val options = 
context.pipelineOptions.`as`(YourPipelineOptions::class.java)
            producer = getKafkaProducer(options)
        }

        @ProcessElement
        fun processElement(context: ProcessContext){
            // Omitted for brevity
            
            // Produce the record to a specific topic at a specific partition
            producer.send(ProducerRecord(
                "your_topic_here",
                your_partition_here,
                context.element().kv.key,
                context.element().kv.value
            ))
        }
}
{code}

  was:At present, the `WriteRecords` support for the KafkaIO does not recognize 
the `partition` property defined on `ProducerRecord` instances consumed by the 
transform. This ticket would added support so that any explicit partitioning 
that was defined would be acknowledged accordingly while still respecting the 
default behavior if it was not explicitly included.


> KafkaIO - Partition Recognition in WriteRecords
> -----------------------------------------------
>
>                 Key: BEAM-11806
>                 URL: https://issues.apache.org/jira/browse/BEAM-11806
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Rion Williams
>            Assignee: Rion Williams
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> At present, the `WriteRecords` support for the KafkaIO does not recognize the 
> `partition` property defined on `ProducerRecord` instances consumed by the 
> transform. This ticket would added support so that any explicit partitioning 
> that was defined would be acknowledged accordingly while still respecting the 
> default behavior if it was not explicitly included.
> This can be easily identified within the `KafkaWriter` class used behind the 
> scenes in the `WriteRecords` transform:
> {code:java}
> producer.send(
>         // The null property in the following constructor represents partition
>         new ProducerRecord<>(
>             topicName, null, timestampMillis, record.key(), record.value(), 
> record.headers()),
>         new SendCallback());
> {code}
> Because of this limitation, in a scenario where a user may desire an 
> explicitly defined partitioning strategy as opposed to round-robin, they 
> would have to create their own custom DoFn that defines a KafkaProducer 
> (preferably within a @StartBundle) similar to the following approach (in 
> Kotlin):
> {code:java}
> private class ExampleProducerDoFn(...): DoFn<...>() {
>         private lateinit var producer: KafkaProducer<...>
>         @StartBundle
>         fun startBundle(context: StartBundleContext) {
>             val options = 
> context.pipelineOptions.`as`(YourPipelineOptions::class.java)
>             producer = getKafkaProducer(options)
>         }
>         @ProcessElement
>         fun processElement(context: ProcessContext){
>             // Omitted for brevity
>             
>             // Produce the record to a specific topic at a specific partition
>             producer.send(ProducerRecord(
>                 "your_topic_here",
>                 your_partition_here,
>                 context.element().kv.key,
>                 context.element().kv.value
>             ))
>         }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to