gattu shivakumar created SPARK-38672: ----------------------------------------
Summary: Kafka, spark data frame read fails when a message does not exist for the timestamp in all partitions Key: SPARK-38672 URL: https://issues.apache.org/jira/browse/SPARK-38672 Project: Spark Issue Type: Bug Components: Build Affects Versions: 3.2.1 Reporter: gattu shivakumar *Issue:* at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.spark.util.UninterruptibleThreadRunner$$anon$1.run(UninterruptibleThreadRunner.scala:33) Caused by: java.lang.AssertionError: *No offset matched from request of topic-partition source_JSON_sgattu_primittive-5 and timestamp 1648013731709.* at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) To repro the issue, produce few messages to kafka and then try to fetch using below program (Used scala shell) bin/kafka-console-producer.sh --bootstrap-server pkc-epwny.eastus.azure.confluent.cloud:9092 --topic source_JSON_sgattu_primittive --producer.config kafkabroker.properties {quote}{"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","BinaryField":true} {"ordertime":1497014222399,"orderid":19,"itemid":"Item_185","BinaryField":false} {quote} import org.apache.spark.sql.functions._ import spark.implicits._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val kafkaServer: String = "XXX:9092" val topicSampleName: String = "t2" val columns=Array("id", "value", "last", "year", "binary", "intbinary") val df=sc.parallelize(Seq( (1, "John", "Doe", 1986, true, 1), (2, "Ive", "Fish", 1990, false, 0), (4, "John", "Wayne", 1995, false, 1) )).toDF(columns: _*) spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaServer).option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='XXX' password='XXX';") .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.security.protocol", "SASL_SSL") .option("kafka.bootstrap.servers", "XXX:9092") .option("subscribe", "kafkaTopic") .option("startingOffsetsByTimestamp", """\{"source_JSON_sgattu_primittive":{"0":1648013731709,"5":1648013731709,"4":1648013731709,"1":1648013731709,"2":1648013731709,"3":1648013731709}}""") .load() -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org