[
https://issues.apache.org/jira/browse/FLINK-24850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446571#comment-17446571
]
Arseniy Tashoyan commented on FLINK-24850:
------------------------------------------
It is linked to this issue: https://issues.apache.org/jira/browse/FLINK-24851
> KafkaSinkBuilder: NullPointerException if bootstrap.servers are defined via
> properties
> --------------------------------------------------------------------------------------
>
> Key: FLINK-24850
> URL: https://issues.apache.org/jira/browse/FLINK-24850
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: Arseniy Tashoyan
> Priority: Major
>
> Let's create KafkaSink:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> val kafkaSink = KafkaSink.builder[String]()
> .setKafkaProducerConfig(props)
> ...// other settings
> .build()
> {code}
> It fails:
> {code:none}
> java.lang.NullPointerException
> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
> at
> org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
> {code}
> _KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap
> servers are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores
> bootstrap servers in the properties.
> Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via
> properties - not necessarily by calling
> _KafkaSourceBuilder.setBootstrapServers()_. It would be good to enable the
> same contract for _KafkaSinkBuilder_.
> This is convenient to have all Kafka-specific settings in a config-file:
> {code:none}
> settings {
> "bootstrap.servers" = "broker1:9092,broker2:9092"
> ...
> }
> {code}
> Currently we need a workaround: extract the setting _"bootstrap.servers"_
> from other Kafka settings and propagate it to
> _KafkaSinkBuilder.setBootstrapServers()_:
> {code:java}
> val kafkaSink = KafkaSink.builder[String]()
>
> .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
> .setKafkaProducerConfig(props)
> .build()
> {code}
> The expected behavior is to use _"bootstrap.servers"_ provided by
> _KafkaSinkBuilder.setKafkaProducerConfig()_ - unless overridden by
> _KafkaSinkBuilder.setBootstrapServers()_.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)