[ 
https://issues.apache.org/jira/browse/FLINK-15111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hunter Kempf updated FLINK-15111:
---------------------------------
    Description: 
 

Problem: When I attempt to connect to a kafka topic that doesn't exist on I get 
the following error:

 

```

 

{{java.lang.RuntimeException: topicName
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)}}

```

 Based on the KafkaPartitionDiscoverer.Java code this runtime exception should 
return ("Could not fetch partitions for %s. Make sure that the topic exists.")

 

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java]

 

 

  was:
Goal: I am trying to connect a flink job I made to a remote kafka cluster that 
has 3 partitions and requires authentication with SASL_SSL.

Background: I have tested my job against a kafka cluster topic running on my 
localhost that has one partition as well as one that had three partitions and 
it works to read and write to the local kafka.

Problem: When I attempt to connect to a topic that has multiple partitions and 
SASL_SSL I get the following error (for reference topicName is the name of the 
topic I am trying to consume). Weirdly I dont have any issues when I am trying 
to produce to a remote multi-partition topic.

 

```

 

{{java.lang.RuntimeException: topicName
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
    at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)}}

```

 

My Consumer Code looks like this :

 

```

{{  def defineKafkaDataStream[A: TypeInformation](topic: String,
                                                env: StreamExecutionEnvironment,
                                                SASL_username:String,
                                                SASL_password:String,
                                                kafkaBootstrapServer: String = 
"localhost:9092",
                                                zookeeperHost: String = 
"localhost:2181",
                                                groupId: String = "test"
                                               )(implicit c: JsonConverter[A]): 
DataStream[A] = \{
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    properties.setProperty("security.protocol" , "SASL_SSL")
    properties.setProperty("sasl.mechanism" , "PLAIN")
    val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=\"%s\" password=\"%s\";"
    val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
    properties.setProperty("sasl.jaas.config", jaasConfig)
    properties.setProperty("group.id", "MyConsumerGroup")

    env
      .addSource(new FlinkKafkaConsumer(topic, new 
JSONKeyValueDeserializationSchema(true), properties))
      .map(x => x.convertTo[A](c))
  }}}

```

Is there some property I should be setting that I am not? Since it worked fine 
in Local I assume this is a bug with the interaction of the partition 
discoverer when authenticating with SASL_SSL but if it is something I can fix 
with a setting please let me know what to change

 


> java.lang.RuntimeException for 
> KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
>   when using SASL_SSL
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15111
>                 URL: https://issues.apache.org/jira/browse/FLINK-15111
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>         Environment: Flink Version 1.9.0
> Scala Version 2.11.12
> Kafka Cluster Version 2.3.0
>            Reporter: Hunter Kempf
>            Priority: Minor
>
>  
> Problem: When I attempt to connect to a kafka topic that doesn't exist on I 
> get the following error:
>  
> ```
>  
> {{java.lang.RuntimeException: topicName
>  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>  at java.lang.Thread.run(Thread.java:748)}}
> ```
>  Based on the KafkaPartitionDiscoverer.Java code this runtime exception 
> should return ("Could not fetch partitions for %s. Make sure that the topic 
> exists.")
>  
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaPartitionDiscoverer.java]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to