[ 
https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677083#comment-16677083
 ] 

Alexey Romanenko commented on BEAM-5798:
----------------------------------------

Following a 
[discussion|https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E]
 on user@ list, I'd summarise the final implementation of this feature:
 - Do not use user-provided function (as I did in the first PR) since the topic 
name for current key/value could be defined earlier in a pipeline (for example, 
the name depends on window where window duration is 1 day). Instead, we will 
use {{org.apache.kafka.clients.producer.ProducerRecord}} as input record 
format. Actually, we can continue a work, that has been started in BEAM-4038
 - The implementation assumes the followings things to do:
 -- Add new transform {{ProducerRecordWrite}} which will receive 
{{ProducerRecord}} as input value, create new KV, where key is null and value 
is {{ProducerRecord}}, and pass it to general {{KafkaIO.Write<K, V>}} transform.
 -- Add new public method {{PTransform<PCollection<ProducerRecord>, PDone> 
KafkaIO.Write.writeRecords()}} which will create new {{ProducerRecordWrite}} 
transform and should be used if user wants to use {{ProducerRecord}} with 
KafkaIO.
 -- Add {{ProducerRecordCoder}} and use it by default for {{ProducerRecord}} 
 -- Inside {{KafkaWriter}} we check if the value is instance of 
{{ProducerRecord}} and if it's a case then use topic name encoded there or 
default topic if it's null.

[~rangadi] what do you think? Is it going to work for you?

> Add support for dynamic destinations when writing to Kafka
> ----------------------------------------------------------
>
>                 Key: BEAM-5798
>                 URL: https://issues.apache.org/jira/browse/BEAM-5798
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Luke Cwik
>            Assignee: Alexey Romanenko
>            Priority: Major
>              Labels: newbie, starter
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Add support for writing to Kafka based upon contents of the data. This is 
> similar to the dynamic destination approach for file IO and other sinks.
>  
> Source of request: 
> https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to