[
https://issues.apache.org/jira/browse/FLINK-24850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arseniy Tashoyan updated FLINK-24850:
-------------------------------------
Description:
Let's create KafkaSink:
{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
val kafkaSink = KafkaSink.builder[String]()
.setKafkaProducerConfig(props)
...// other settings
.build()
{code}
It fails:
{code:none}
java.lang.NullPointerException
at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}
_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap
servers in the properties.
Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties
- not necessarily by calling _KafkaSourceBuilder.setBootstrapServers()_. It
would be good to enable the same contract for _KafkaSinkBuilder_.
This is convenient to have all Kafka-specific settings in a config-file:
{code:none}
settings {
"bootstrap.servers" = "broker1:9092,broker2:9092"
"group.id" = "retail-streaming"
"auto.offset.reset" = "latest"
}
{code}
Currently we need a workaround: extract the setting _"bootstrap.servers"_ from
other Kafka settings and propagate it to
_KafkaSinkBuilder.setBootstrapServers()_:
{code:java}
val kafkaSink = KafkaSink.builder[String]()
.setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
.setKafkaProducerConfig(props)
.build()
{code}
The expected behavior is to use the value of the setting _"bootstrap.servers"_
- unless overridden by _KafkaSinkBuilder. setBootstrapServers()_.
was:
Let's create KafkaSink:
{code:scala}
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "group1")
val kafkaSink = KafkaSink.builder[String]()
.setKafkaProducerConfig(props)
...// other settings
.build()
{code}
It fails:
{code:none}
java.lang.NullPointerException
at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at
org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
{code}
_KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap servers
are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores bootstrap
servers in the properties.
Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via properties
- not necessarily by calling _KafkaSourceBuilder.setBootstrapServers()_. It
would be good to enable the same contract for _KafkaSinkBuilder_.
This is convenient to have all Kafka-specific settings in a config-file:
{code:none}
settings {
"bootstrap.servers" = "broker1:9092,broker2:9092"
"group.id" = "retail-streaming"
"auto.offset.reset" = "latest"
}
{code}
Such settings can be passed as-is to
_KafkaSinkBuilder.setKafkaProducerConfig()_ - no special treatment for
_"bootstrap.servers"_.
> KafkaSinkBuilder: NullPointerException if bootstrap.servers are defined via
> properties
> --------------------------------------------------------------------------------------
>
> Key: FLINK-24850
> URL: https://issues.apache.org/jira/browse/FLINK-24850
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.14.0
> Reporter: Arseniy Tashoyan
> Priority: Major
>
> Let's create KafkaSink:
> {code:scala}
> val props = new Properties()
> props.put("bootstrap.servers", "localhost:9092")
> props.put("group.id", "group1")
> val kafkaSink = KafkaSink.builder[String]()
> .setKafkaProducerConfig(props)
> ...// other settings
> .build()
> {code}
> It fails:
> {code:none}
> java.lang.NullPointerException
> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
> at
> org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.build(KafkaSinkBuilder.java:192)
> {code}
> _KafkaSinkBuilder.build()_ fails, because it expects that the bootstrap
> servers are set via _KafkaSinkBuilder.setBootstrapServers()_. It ignores
> bootstrap servers in the properties.
> Meanwhile _KafkaSourseBuilder_ allows setting bootstrap servers via
> properties - not necessarily by calling
> _KafkaSourceBuilder.setBootstrapServers()_. It would be good to enable the
> same contract for _KafkaSinkBuilder_.
> This is convenient to have all Kafka-specific settings in a config-file:
> {code:none}
> settings {
> "bootstrap.servers" = "broker1:9092,broker2:9092"
> "group.id" = "retail-streaming"
> "auto.offset.reset" = "latest"
> }
> {code}
> Currently we need a workaround: extract the setting _"bootstrap.servers"_
> from other Kafka settings and propagate it to
> _KafkaSinkBuilder.setBootstrapServers()_:
> {code:java}
> val kafkaSink = KafkaSink.builder[String]()
>
> .setBootstrapServers(props.get("bootstrap.servers").asInstanceOf[String])
> .setKafkaProducerConfig(props)
> .build()
> {code}
> The expected behavior is to use the value of the setting
> _"bootstrap.servers"_ - unless overridden by _KafkaSinkBuilder.
> setBootstrapServers()_.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)