http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
<oleksii.duk...@gmail.com> wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi <anton.okolnyc...@gmail.com>
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>>     val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>>     val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>>     val firstResult = stream.reduceByWindow(...)
>>     firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>>     // another computation on wordCounts
>>     val secondResult = wordCounts.reduceByKeyAndWindow(...)
>>     secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao <t...@timshenkao.su>:
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)
>>>>
>>>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>> <anton.okolnyc...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am experimenting with Spark Streaming and Kafka. I will appreciate if
>>>> someone can say whether the following assumption is correct.
>>>>
>>>> If I have multiple computations (each with its own output) on one stream
>>>> (created as KafkaUtils.createDirectStream), then there is a chance to have
>>>> ConcurrentModificationException: KafkaConsumer is not safe for
>>>> multi-threaded access.  To solve this problem, I should create a new stream
>>>> with different "group.id" for each computation.
>>>>
>>>> Am I right?
>>>>
>>>> Best regards,
>>>> Anton
>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to