[ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889 ]
Binzi Cao commented on SPARK-24189: ----------------------------------- It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 {code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143) [error] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205) [error] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [error] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230) {code} So it looks like it is not working with a topic with kafka transactions at all. The exception was thrown here: https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272 Setting {code:java} failOnDataLoss=false {code} can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code. > Spark Strcutured Streaming not working with the Kafka Transactions > ------------------------------------------------------------------ > > Key: SPARK-24189 > URL: https://issues.apache.org/jira/browse/SPARK-24189 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: bharath kumar avusherla > Priority: Major > > Was trying to read kafka transactional topic using Spark Structured Streaming > 2.3.0 with the kafka option isolation-level = "read_committed", but spark > reading the data immediately without waiting for the data in topic to be > committed. In spark documentation it was mentioned as Structured Streaming > supports Kafka version 0.10 or higher. I am using below command to test the > scenario. > val df = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test-topic") > .option("isolation-level","read_committed") > .load() > Can you please let me know if the transactional read is supported in SPark > 2.3.0 strcutured Streaming or am i missing anything. > > Thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org