[ 
https://issues.apache.org/jira/browse/FLINK-13340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897160#comment-16897160
 ] 

Timo Walther commented on FLINK-13340:
--------------------------------------

Hi [~dubin555], your proposed changes sound reasonable to me. I will assign 
this issue to you.

> Add more Kafka topic option of flink-connector-kafka
> ----------------------------------------------------
>
>                 Key: FLINK-13340
>                 URL: https://issues.apache.org/jira/browse/FLINK-13340
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Table SQL / API
>    Affects Versions: 1.8.1
>            Reporter: DuBin
>            Priority: Major
>              Labels: features
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Currently, only 'topic' option implemented in the Kafka Connector Descriptor, 
> we can only use it like :
> {code:java}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tableEnv = StreamTableEnvironment.create(env)
>     tableEnv
>       .connect(
>         new Kafka()
>           .version("0.11")
>           .topic("test-flink-1")
>           .startFromEarliest()
>           .property("zookeeper.connect", "localhost:2181")
>           .property("bootstrap.servers", "localhost:9092"))
>       .withFormat(
>         new Json()
>           .deriveSchema()
>       )
>       .withSchema(
>         new Schema()
>           .field("name", Types.STRING)
>           .field("age", Types.STRING)
>       ){code}
> but we cannot consume multiple topics or a topic regex pattern. 
> Here is my thoughts:
> {code:java}
>           .topic("test-flink-1") 
>           //.topics("test-flink-1,test-flink-2") or topics(List<String> 
> topics)
>           //.subscriptionPattern("test-flink-.*") or 
> subscriptionPattern(Pattern pattern)
> {code}
> I already implement the code on my local env with help of the 
> FlinkKafkaConsumer, and it works.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to