[
https://issues.apache.org/jira/browse/BEAM-11806?focusedWorklogId=551852&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-551852
]
ASF GitHub Bot logged work on BEAM-11806:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Feb/21 15:12
Start Date: 12/Feb/21 15:12
Worklog Time Spent: 10m
Work Description: rionmonster commented on pull request #13975:
URL: https://github.com/apache/beam/pull/13975#issuecomment-778252803
Thanks Alexey,
I've updated the original commit message to include the JIRA issue as
recommended. Do you know of the more appropriate place within the repository to
add the unit test? I see quite a bit of Kafka-related tests across the place,
but wasn't sure where something like this would best fit.
Additionally, I do feel that the recommended refactoring would be most
appropriate in this case (i.e. sending the `ProducerRecord` instance itself
instead of creating a new instance). IMO, that change makes more sense, but
I'll leave it up to you if that's the referred fix here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 551852)
Time Spent: 0.5h (was: 20m)
> KafkaIO - Partition Recognition in WriteRecords
> -----------------------------------------------
>
> Key: BEAM-11806
> URL: https://issues.apache.org/jira/browse/BEAM-11806
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Rion Williams
> Assignee: Rion Williams
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> At present, the `WriteRecords` support for the KafkaIO does not recognize the
> `partition` property defined on `ProducerRecord` instances consumed by the
> transform. This ticket would added support so that any explicit partitioning
> that was defined would be acknowledged accordingly while still respecting the
> default behavior if it was not explicitly included.
> This can be easily identified within the `KafkaWriter` class used behind the
> scenes in the `WriteRecords` transform:
> {code:java}
> producer.send(
> // The null property in the following constructor represents partition
> new ProducerRecord<>(
> topicName, null, timestampMillis, record.key(), record.value(),
> record.headers()),
> new SendCallback());
> {code}
> Because of this limitation, in a scenario where a user may desire an
> explicitly defined partitioning strategy as opposed to round-robin, they
> would have to create their own custom DoFn that defines a KafkaProducer
> (preferably within a @StartBundle) similar to the following approach (in
> Kotlin):
> {code:java}
> private class ExampleProducerDoFn(...): DoFn<...>() {
> private lateinit var producer: KafkaProducer<...>
> @StartBundle
> fun startBundle(context: StartBundleContext) {
> val options =
> context.pipelineOptions.`as`(YourPipelineOptions::class.java)
> producer = getKafkaProducer(options)
> }
> @ProcessElement
> fun processElement(context: ProcessContext){
> // Omitted for brevity
>
> // Produce the record to a specific topic at a specific partition
> producer.send(ProducerRecord(
> "your_topic_here",
> your_partition_here,
> context.element().kv.key,
> context.element().kv.value
> ))
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)