Hi Team, We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception).
*Here is the code where executor construct executor specific group id * https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala line # 212, *Here are my Questions* #1 What was the specific reason for prefixing group id in executor ? #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10 <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> library by removing the group id prefix? #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production? *Here is the my spark streaming code snippet* val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS, ConsumerConfig.GROUP_ID_CONFIG -> subscribers, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer], "security.protocol" -> "SSL", "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH, "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD, "ssl.keystore.location" -> Constants.KEYSTORE_PATH, "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD, "ssl.key.password" -> Constants.KEYSTORE_PASSWORD ) val stream = KafkaUtils.createDirectStream[String, Message]( ssc, PreferConsistent, Subscribe[String, Message](topicsSet, kafkaParams) ) --- Thanks in Advance, Sethupathi.T