[ 
https://issues.apache.org/jira/browse/FLINK-24850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446225#comment-17446225
 ] 

liwei li commented on FLINK-24850:
----------------------------------

This is similar to the ticket we discussed about kafka.auto.reset. If possible, 
please give me this ticket as well. 

> KafkaSinkBuilder: NullPointerException if bootstrap.servers are defined via 
> properties
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-24850
>                 URL: https://issues.apache.org/jira/browse/FLINK-24850
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> Let's create KafkaSink:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> val kafkaSink = KafkaSink.builder[String]()
>       .setKafkaProducerConfig(props)
>       ...// other settings
>       .build()
> {code}
> It fails:
> {code:none}
> java.lang.NullPointerException
>       at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
>       at 
> org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
> {code}
> _KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap 
> servers are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores 
> bootstrap servers in the properties.
> Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via 
> properties - not necessarily by calling 
> _KafkaSourceBuilder.setBootstrapServers()_. It would be good to enable the 
> same contract for _KafkaSinkBuilder_.
> This is convenient to have all Kafka-specific settings in a config-file:
> {code:none}
>     settings {
>       "bootstrap.servers" = "broker1:9092,broker2:9092"
>       ...
>     }
> {code}
> Currently we need a workaround: extract the setting _"bootstrap.servers"_ 
> from other Kafka settings and propagate it to 
> _KafkaSinkBuilder.setBootstrapServers()_:
> {code:java}
> val kafkaSink = KafkaSink.builder[String]()
>       
> .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
>       .setKafkaProducerConfig(props)
>       .build()
> {code}
> The expected behavior is to use _"bootstrap.servers"_ provided by 
> _KafkaSinkBuilder.setKafkaProducerConfig()_ - unless overridden by 
> _KafkaSinkBuilder.setBootstrapServers()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to