[
https://issues.apache.org/jira/browse/BEAM-11806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anonymous updated BEAM-11806:
-----------------------------
Status: Triage Needed (was: Resolved)
> KafkaIO - Partition Recognition in WriteRecords
> -----------------------------------------------
>
> Key: BEAM-11806
> URL: https://issues.apache.org/jira/browse/BEAM-11806
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Rion Williams
> Assignee: Rion Williams
> Priority: P2
> Fix For: 2.29.0
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> At present, the `WriteRecords` support for the KafkaIO does not recognize the
> `partition` property defined on `ProducerRecord` instances consumed by the
> transform. This ticket would added support so that any explicit partitioning
> that was defined would be acknowledged accordingly while still respecting the
> default behavior if it was not explicitly included.
> This can be easily identified within the `KafkaWriter` class used behind the
> scenes in the `WriteRecords` transform:
> {code:java}
> producer.send(
> // The null property in the following constructor represents partition
> new ProducerRecord<>(
> topicName, null, timestampMillis, record.key(), record.value(),
> record.headers()),
> new SendCallback());
> {code}
> Because of this limitation, in a scenario where a user may desire an
> explicitly defined partitioning strategy as opposed to round-robin, they
> would have to create their own custom DoFn that defines a KafkaProducer
> (preferably within a @StartBundle) similar to the following approach (in
> Kotlin):
> {code:java}
> private class ExampleProducerDoFn(...): DoFn<...>() {
> private lateinit var producer: KafkaProducer<...>
> @StartBundle
> fun startBundle(context: StartBundleContext) {
> val options =
> context.pipelineOptions.`as`(YourPipelineOptions::class.java)
> producer = getKafkaProducer(options)
> }
> @ProcessElement
> fun processElement(context: ProcessContext){
> // Omitted for brevity
>
> // Produce the record to a specific topic at a specific partition
> producer.send(ProducerRecord(
> "your_topic_here",
> your_partition_here,
> context.element().kv.key,
> context.element().kv.value
> ))
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)