Apache Spark commented on SPARK-23623:

User 'tdas' has created a pull request for this issue:

> Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer 
> (kafka-0-10-sql)
> ------------------------------------------------------------------------------------
>                 Key: SPARK-23623
>                 URL: https://issues.apache.org/jira/browse/SPARK-23623
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Critical
> CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a 
> pool of KafkaConsumers that can be reused. However, it was built with the 
> assumption there will be only one task using trying to read the same Kafka 
> TopicPartition at the same time. Hence, the cache was keyed by the 
> TopicPartition a consumer is supposed to read. And any cases where this 
> assumption may not be true, we have SparkPlan flag to disable the use of a 
> cache. So it was up to the planner to correctly identify when it was not safe 
> to use the cache and set the flag accordingly. 
> Fundamentally, this is the wrong way to approach the problem. It is HARD for 
> a high-level planner to reason about the low-level execution model, whether 
> there will be multiple tasks in the same query trying to read the same 
> partition. Case in point, 2.3.0 introduced stream-stream joins, and you can 
> build a streaming self-join query on Kafka. It's pretty non-trivial to figure 
> out how this leads to two tasks reading the same partition twice, possibly 
> concurrently. And due to the non-triviality, it is hard to figure this out in 
> the planner and set the flag to avoid the cache / consumer pool. And this can 
> inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent 
> reading of incorrect data.
> Here is a better way to design this. The planner shouldnt have to understand 
> these low-level optimizations. Rather the consumer pool should be smart 
> enough avoid concurrent use of a cached consumer. Currently, it tries to do 
> so but incorrectly (the flag {{inuse}} is not checked when returning a cached 
> consumer, see 
> [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).]
>  If there is another request for the same partition as a currently in-use 
> consumer, the pool should automatically return a fresh consumer that should 
> be closed when the task is done.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to