Hunter Kempf created FLINK-15111:
------------------------------------
Summary: java.lang.RuntimeException for
KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
when using SASL_SSL
Key: FLINK-15111
URL: https://issues.apache.org/jira/browse/FLINK-15111
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.9.0
Environment: Flink Version 1.9.0
Scala Version 2.11.12
Kafka Cluster Version 2.3.0
Reporter: Hunter Kempf
Goal: I am trying to connect a flink job I made to a remote kafka cluster that
has 3 partitions and requires authentication with SASL_SSL.
Background: I have tested my job against a kafka cluster topic running on my
localhost that has one partition as well as one that had three partitions and
it works to read and write to the local kafka.
Problem: When I attempt to connect to a topic that has multiple partitions and
SASL_SSL I get the following error (for reference topicName is the name of the
topic I am trying to consume). Weirdly I dont have any issues when I am trying
to produce to a remote multi-partition topic.
```
{{java.lang.RuntimeException: topicName
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)}}
```
My Consumer Code looks like this :
```
{{ def defineKafkaDataStream[A: TypeInformation](topic: String,
env: StreamExecutionEnvironment,
SASL_username:String,
SASL_password:String,
kafkaBootstrapServer: String =
"localhost:9092",
zookeeperHost: String =
"localhost:2181",
groupId: String = "test"
)(implicit c: JsonConverter[A]):
DataStream[A] = \{
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
properties.setProperty("security.protocol" , "SASL_SSL")
properties.setProperty("sasl.mechanism" , "PLAIN")
val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"%s\" password=\"%s\";"
val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
properties.setProperty("sasl.jaas.config", jaasConfig)
properties.setProperty("group.id", "MyConsumerGroup")
env
.addSource(new FlinkKafkaConsumer(topic, new
JSONKeyValueDeserializationSchema(true), properties))
.map(x => x.convertTo[A](c))
}}}
```
Is there some property I should be setting that I am not? Since it worked fine
in Local I assume this is a bug with the interaction of the partition
discoverer when authenticating with SASL_SSL but if it is something I can fix
with a setting please let me know what to change
--
This message was sent by Atlassian Jira
(v8.3.4#803005)