[ 
https://issues.apache.org/jira/browse/FLINK-24850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arseniy Tashoyan updated FLINK-24850:
-------------------------------------
    Description: 
Let's create KafkaSink:

{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
val kafkaSink = KafkaSink.builder[String]()
      .setKafkaProducerConfig(props)
      ...// other settings
      .build()
{code}

It fails:

{code:none}
java.lang.NullPointerException
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
        at 
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}

_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers 
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap 
servers in the properties.

Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties 
- not necessarily by calling _KafkaSourceBuilder.setBootstrapServers()_. It 
would be good to enable the same contract for _KafkaSinkBuilder_.

This is convenient to have all Kafka-specific settings in a config-file:

{code:none}
    settings {
      "bootstrap.servers" = "broker1:9092,broker2:9092"
      ...
    }
{code}

Currently we need a workaround: extract the setting _"bootstrap.servers"_ from 
other Kafka settings and propagate it to 
_KafkaSinkBuilder.setBootstrapServers()_:


{code:java}
val kafkaSink = KafkaSink.builder[String]()
      .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
      .setKafkaProducerConfig(props)
      .build()
{code}

The expected behavior is to use _"bootstrap.servers"_ provided by 
_KafkaSinkBuilder.setKafkaProducerConfig()_ - unless overridden by 
_KafkaSinkBuilder.setBootstrapServers()_.

  was:
Let's create KafkaSink:

{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
val kafkaSink = KafkaSink.builder[String]()
      .setKafkaProducerConfig(props)
      ...// other settings
      .build()
{code}

It fails:

{code:none}
java.lang.NullPointerException
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
        at 
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}

_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers 
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap 
servers in the properties.

Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties 
- not necessarily by calling _KafkaSourceBuilder.setBootstrapServers()_. It 
would be good to enable the same contract for _KafkaSinkBuilder_.

This is convenient to have all Kafka-specific settings in a config-file:

{code:none}
    settings {
      "bootstrap.servers" = "broker1:9092,broker2:9092"
      "group.id" = "retail-streaming"
      "auto.offset.reset" = "latest"
    }
{code}

Currently we need a workaround: extract the setting _"bootstrap.servers"_ from 
other Kafka settings and propagate it to 
_KafkaSinkBuilder.setBootstrapServers()_:


{code:java}
val kafkaSink = KafkaSink.builder[String]()
      .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
      .setKafkaProducerConfig(props)
      .build()
{code}

The expected behavior is to use _"bootstrap.servers"_ provided by 
_KafkaSinkBuilder.setKafkaProducerConfig()_ - unless overridden by 
_KafkaSinkBuilder.setBootstrapServers()_.


> KafkaSinkBuilder: NullPointerException if bootstrap.servers are defined via 
> properties
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-24850
>                 URL: https://issues.apache.org/jira/browse/FLINK-24850
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>
> Let's create KafkaSink:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> val kafkaSink = KafkaSink.builder[String]()
>       .setKafkaProducerConfig(props)
>       ...// other settings
>       .build()
> {code}
> It fails:
> {code:none}
> java.lang.NullPointerException
>       at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
>       at 
> org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
> {code}
> _KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap 
> servers are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores 
> bootstrap servers in the properties.
> Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via 
> properties - not necessarily by calling 
> _KafkaSourceBuilder.setBootstrapServers()_. It would be good to enable the 
> same contract for _KafkaSinkBuilder_.
> This is convenient to have all Kafka-specific settings in a config-file:
> {code:none}
>     settings {
>       "bootstrap.servers" = "broker1:9092,broker2:9092"
>       ...
>     }
> {code}
> Currently we need a workaround: extract the setting _"bootstrap.servers"_ 
> from other Kafka settings and propagate it to 
> _KafkaSinkBuilder.setBootstrapServers()_:
> {code:java}
> val kafkaSink = KafkaSink.builder[String]()
>       
> .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
>       .setKafkaProducerConfig(props)
>       .build()
> {code}
> The expected behavior is to use _"bootstrap.servers"_ provided by 
> _KafkaSinkBuilder.setKafkaProducerConfig()_ - unless overridden by 
> _KafkaSinkBuilder.setBootstrapServers()_.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to