Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use
spark streaming (KafkaUtils.createDirectStream) than structured streaming
by following this document
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and
re-iterating the issue again for better understanding.
spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
kafka
connector prefix "spark-executor" + group.id for executors, driver uses
original group id.

*Here is the code where executor construct executor specific group id *

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
# 212,

*It would be great if you could provide the explanation to the following
questions.*

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
<https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
advisable to use in production?

*Here is the my spark streaming code snippet*

val kafkaParams = Map[String, Object](

  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)

---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi,
>
> Let me share Spark 3.0 documentation part (Structured Streaming and not
> DStreams what you've mentioned but still relevant):
>
> kafka.group.id string none streaming and batch The Kafka group id to use
> in Kafka consumer while reading from Kafka. Use this with caution. By
> default, each query generates a unique group id for reading data. This
> ensures that each Kafka source has its own consumer group that does not
> face interference from any other consumer, and therefore can read all of
> the partitions of its subscribed topics. In some scenarios (for example,
> Kafka group-based authorization), you may want to use a specific authorized
> group id to read data. You can optionally set the group id. However, do
> this with extreme caution as it can cause unexpected behavior. Concurrently
> running queries (both, batch and streaming) or sources with the same group
> id are likely interfere with each other causing each query to read only
> part of the data. This may also occur when queries are started/restarted in
> quick succession. To minimize such issues, set the Kafka consumer session
> timeout (by setting option "kafka.session.timeout.ms") to be very small.
> When this is set, option "groupIdPrefix" will be ignored.
> I think it answers your questions.
>
> As a general suggestion maybe it worth to revisit Spark 3.0 because
> Structured Streaming has another interesting feature:
> groupIdPrefix string spark-kafka-source streaming and batch Prefix of
> consumer group identifiers (`group.id`) that are generated by structured
> streaming queries. If "kafka.group.id" is set, this option will be
> ignored.
>
> BR,
> G
>
>
> On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
> <sethu....@googlemail.com.invalid> wrote:
>
>> Hi Team,
>>
>> We have secured Kafka cluster (which only allows to consume from the
>> pre-configured, authorized consumer group), there is a scenario where we
>> want to use spark streaming to consume from secured kafka. so we have
>> decided to use spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> 
>> (it
>> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
>> deploy the application in cluster mode, i realized that the actual group id
>> has been prefixed with "spark-executor" in executor configuration (executor
>> as trying to connect to kafka with "spark-executor" + actual group id,
>> which is not really exists and getting exception).
>>
>> *Here is the code where executor construct executor specific group id *
>>
>>
>> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>  line
>> # 212,
>>
>> *Here are my Questions*
>>
>> #1 What was the specific reason for prefixing group id in executor ?
>>
>> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
>> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>> library by removing the group id prefix?
>>
>> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
>> advisable to use in production?
>>
>> *Here is the my spark streaming code snippet*
>>
>> val kafkaParams = Map[String, Object](
>>
>>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[StringDeserializer],
>>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
>> classOf[MessageDeserializer],
>>   "security.protocol" -> "SSL",
>>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
>> )
>>
>> val stream = KafkaUtils.createDirectStream[String, Message](
>>   ssc,
>>   PreferConsistent,
>>   Subscribe[String, Message](topicsSet, kafkaParams)
>> )
>>
>> ---
>> Thanks in Advance,
>> Sethupathi.T
>>
>

Reply via email to