[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889
 ] 

Binzi Cao commented on SPARK-24189:
-----------------------------------

It seems I'm hitting a similar issuel. I managed to set the kafka isolation 
level with

{code:java}
.option("kafka.isolation.level", "read_committed")
{code}

and using 
{code:java}
kafka-client 1.0.0 
{code}
 and I'm seeing this issue: 


{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error]         at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error]         at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error]         at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error]         at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error]         at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error]         at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error]         at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error]         at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error]         at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error]         at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error]         at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error]         at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error]         at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error]         at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error]         at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error]         at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}


So it looks like it is not working with a topic with kafka transactions at all. 

The exception was thrown here:
https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272

Setting
{code:java}
 failOnDataLoss=false
{code}
 can't fix the issue, as the exception is never caught in the 
KafkaDataConsumer.scala code. 





> Spark Strcutured Streaming not working with the Kafka Transactions
> ------------------------------------------------------------------
>
>                 Key: SPARK-24189
>                 URL: https://issues.apache.org/jira/browse/SPARK-24189
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: bharath kumar avusherla
>            Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to