[
https://issues.apache.org/jira/browse/FLINK-14528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peng updated FLINK-14528:
-------------------------
Summary: Add a constructor for FlinkKafkaProducer (was: Add a Constructor
for FlinkKafkaProducer)
> Add a constructor for FlinkKafkaProducer
> ----------------------------------------
>
> Key: FLINK-14528
> URL: https://issues.apache.org/jira/browse/FLINK-14528
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Reporter: Peng
> Priority: Minor
>
> In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer
> constructor.
> In fact, if I use the below constructor, it is not necessary. Furthermore, it
> is confused for developer to specify different topic name.
> {code:java}
> public FlinkKafkaProducer( String defaultTopic,
> KafkaSerializationSchema<IN> serializationSchema,
> Properties producerConfig,
> FlinkKafkaProducer.Semantic semantic)
> {code}
> For example, set topic name to bar in the constructor.
> {code:java}
> input.addSink( new FlinkKafkaProducer<>(
> "bar",
> new KafkaSerializationSchemaImpl(),
> properties,
>
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
> {code}
> But actually send records to topic baz from KafkaSerializationSchema.
> {code:java}
> public class KafkaSerializationSchemaImpl implements
> KafkaSerializationSchema<KafkaEvent>
> {
> @Override
> public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event,
> @Nullable Long timestamp) {
> return new ProducerRecord<>("baz", event.toString().getBytes());
> }
> }
> {code}
> So I suggest add a new constructor like below.
> {code:java}
> public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
> Properties producerConfig,
> FlinkKafkaProducer.Semantic semantic)
> {code}
> It is my humble opinion, please correct me, thanks in advance.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)