Gabor, Thanks for the quick response and sharing about spark 3.0, we need to use spark streaming (KafkaUtils.createDirectStream) than structured streaming by following this document https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and re-iterating the issue again for better understanding. spark-streaming-kafka-0-10 <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> kafka connector prefix "spark-executor" + group.id for executors, driver uses original group id.
*Here is the code where executor construct executor specific group id * https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala line # 212, *It would be great if you could provide the explanation to the following questions.* #1 What was the specific reason for prefixing group id in executor ? #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10 <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> library by removing the group id prefix? at line # 212 in KafkaUtils.scala #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production? *Here is the my spark streaming code snippet* val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS, ConsumerConfig.GROUP_ID_CONFIG -> subscribers, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer], "security.protocol" -> "SSL", "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH, "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD, "ssl.keystore.location" -> Constants.KEYSTORE_PATH, "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD, "ssl.key.password" -> Constants.KEYSTORE_PASSWORD ) val stream = KafkaUtils.createDirectStream[String, Message]( ssc, PreferConsistent, Subscribe[String, Message](topicsSet, kafkaParams) ) --- Thanks in Advance, Sethupathi.T On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi, > > Let me share Spark 3.0 documentation part (Structured Streaming and not > DStreams what you've mentioned but still relevant): > > kafka.group.id string none streaming and batch The Kafka group id to use > in Kafka consumer while reading from Kafka. Use this with caution. By > default, each query generates a unique group id for reading data. This > ensures that each Kafka source has its own consumer group that does not > face interference from any other consumer, and therefore can read all of > the partitions of its subscribed topics. In some scenarios (for example, > Kafka group-based authorization), you may want to use a specific authorized > group id to read data. You can optionally set the group id. However, do > this with extreme caution as it can cause unexpected behavior. Concurrently > running queries (both, batch and streaming) or sources with the same group > id are likely interfere with each other causing each query to read only > part of the data. This may also occur when queries are started/restarted in > quick succession. To minimize such issues, set the Kafka consumer session > timeout (by setting option "kafka.session.timeout.ms") to be very small. > When this is set, option "groupIdPrefix" will be ignored. > I think it answers your questions. > > As a general suggestion maybe it worth to revisit Spark 3.0 because > Structured Streaming has another interesting feature: > groupIdPrefix string spark-kafka-source streaming and batch Prefix of > consumer group identifiers (`group.id`) that are generated by structured > streaming queries. If "kafka.group.id" is set, this option will be > ignored. > > BR, > G > > > On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T > <sethu....@googlemail.com.invalid> wrote: > >> Hi Team, >> >> We have secured Kafka cluster (which only allows to consume from the >> pre-configured, authorized consumer group), there is a scenario where we >> want to use spark streaming to consume from secured kafka. so we have >> decided to use spark-streaming-kafka-0-10 >> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >> (it >> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i >> deploy the application in cluster mode, i realized that the actual group id >> has been prefixed with "spark-executor" in executor configuration (executor >> as trying to connect to kafka with "spark-executor" + actual group id, >> which is not really exists and getting exception). >> >> *Here is the code where executor construct executor specific group id * >> >> >> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala >> line >> # 212, >> >> *Here are my Questions* >> >> #1 What was the specific reason for prefixing group id in executor ? >> >> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10 >> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> >> library by removing the group id prefix? >> >> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it >> advisable to use in production? >> >> *Here is the my spark streaming code snippet* >> >> val kafkaParams = Map[String, Object]( >> >> ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS, >> ConsumerConfig.GROUP_ID_CONFIG -> subscribers, >> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", >> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), >> ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> >> classOf[StringDeserializer], >> ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> >> classOf[MessageDeserializer], >> "security.protocol" -> "SSL", >> "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH, >> "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD, >> "ssl.keystore.location" -> Constants.KEYSTORE_PATH, >> "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD, >> "ssl.key.password" -> Constants.KEYSTORE_PASSWORD >> ) >> >> val stream = KafkaUtils.createDirectStream[String, Message]( >> ssc, >> PreferConsistent, >> Subscribe[String, Message](topicsSet, kafkaParams) >> ) >> >> --- >> Thanks in Advance, >> Sethupathi.T >> >