[
https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677083#comment-16677083
]
Alexey Romanenko commented on BEAM-5798:
----------------------------------------
Following a
[discussion|https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E]
on user@ list, I'd summarise the final implementation of this feature:
- Do not use user-provided function (as I did in the first PR) since the topic
name for current key/value could be defined earlier in a pipeline (for example,
the name depends on window where window duration is 1 day). Instead, we will
use {{org.apache.kafka.clients.producer.ProducerRecord}} as input record
format. Actually, we can continue a work, that has been started in BEAM-4038
- The implementation assumes the followings things to do:
-- Add new transform {{ProducerRecordWrite}} which will receive
{{ProducerRecord}} as input value, create new KV, where key is null and value
is {{ProducerRecord}}, and pass it to general {{KafkaIO.Write<K, V>}} transform.
-- Add new public method {{PTransform<PCollection<ProducerRecord>, PDone>
KafkaIO.Write.writeRecords()}} which will create new {{ProducerRecordWrite}}
transform and should be used if user wants to use {{ProducerRecord}} with
KafkaIO.
-- Add {{ProducerRecordCoder}} and use it by default for {{ProducerRecord}}
-- Inside {{KafkaWriter}} we check if the value is instance of
{{ProducerRecord}} and if it's a case then use topic name encoded there or
default topic if it's null.
[~rangadi] what do you think? Is it going to work for you?
> Add support for dynamic destinations when writing to Kafka
> ----------------------------------------------------------
>
> Key: BEAM-5798
> URL: https://issues.apache.org/jira/browse/BEAM-5798
> Project: Beam
> Issue Type: New Feature
> Components: io-java-kafka
> Reporter: Luke Cwik
> Assignee: Alexey Romanenko
> Priority: Major
> Labels: newbie, starter
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Add support for writing to Kafka based upon contents of the data. This is
> similar to the dynamic destination approach for file IO and other sinks.
>
> Source of request:
> https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)