[ 
https://issues.apache.org/jira/browse/FLINK-22748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351327#comment-17351327
 ] 

Benoît Paris commented on FLINK-22748:
--------------------------------------

Regarding the API: The topic configuration currently has `topic`, which can be 
replaced by `topic-pattern` for sources. I see 3 different considerations:

1.
Going in this direction, the configuration could accept a `topic-target` 
parameter, which would accept a column name referring to a String column, and 
which would replace `topic` as well in this case, for sinks.

Would that be a sensible convention, or is the `topic` option a bit too much 
special-cased/overloaded with this new case? I mean the Required column in the 
doc would have something convoluted like 'Required, only if `topic-pattern` is 
not provided for sources, and `topic-target` is not provided for sinks'; would 
that be ok?

2.
On another note, `topic` and `topic-target` both refer to actual topics, when 
we would ask for a column name. `topic-target-field` should fit better, right?

3.
[~twalthr], you mentioned a "target topic". I followed the convention of 
beginning with `topic-`, but we could have `target-topic`. 

Also you mention a "persisted metadata column", is that something technically 
special like Kafka's metadata; or we would be using a regular SQL column?

 

 

> Allow dynamic target topic selection in SQL Kafka sinks
> -------------------------------------------------------
>
>                 Key: FLINK-22748
>                 URL: https://issues.apache.org/jira/browse/FLINK-22748
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>            Reporter: Timo Walther
>            Priority: Major
>              Labels: starter
>
> We should allow to write to different Kafka topics based on some column value 
> in SQL.
> The existing implementation can be easily adapted for that. The "target 
> topic" would be an additional persisted metadata column in SQL terms. All one 
> need to do is to adapt
> DynamicKafkaSerializationSchema
> KafkaDynamicSink
> We should guard this dynamic behavior via a config option and make the topic 
> option optional in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to