gattu shivakumar created SPARK-38672:
----------------------------------------

             Summary: Kafka, spark data frame read fails when a message does 
not exist for the timestamp in all partitions
                 Key: SPARK-38672
                 URL: https://issues.apache.org/jira/browse/SPARK-38672
             Project: Spark
          Issue Type: Bug
          Components: Build
    Affects Versions: 3.2.1
            Reporter: gattu shivakumar


*Issue:*
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.apache.spark.util.UninterruptibleThreadRunner$$anon$1.run(UninterruptibleThreadRunner.scala:33)
Caused by: java.lang.AssertionError: *No offset matched from request of 
topic-partition source_JSON_sgattu_primittive-5 and timestamp 1648013731709.*
at 
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)

To repro the issue, produce few messages to kafka and then try to fetch using 
below program (Used scala shell)

bin/kafka-console-producer.sh --bootstrap-server 
pkc-epwny.eastus.azure.confluent.cloud:9092 --topic 
source_JSON_sgattu_primittive --producer.config kafkabroker.properties
{quote}{"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","BinaryField":true}
{"ordertime":1497014222399,"orderid":19,"itemid":"Item_185","BinaryField":false}
{quote}
import org.apache.spark.sql.functions._
import spark.implicits._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val kafkaServer: String = "XXX:9092"
val topicSampleName: String = "t2"
val columns=Array("id", "value", "last", "year", "binary", "intbinary")
val df=sc.parallelize(Seq(
(1, "John", "Doe", 1986, true, 1),
(2, "Ive", "Fish", 1990, false, 0),
(4, "John", "Wayne", 1995, false, 1)
)).toDF(columns: _*)

spark.read.format("kafka").option("kafka.bootstrap.servers", 
kafkaServer).option("kafka.sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username='XXX' password='XXX';")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.bootstrap.servers", "XXX:9092")
.option("subscribe", "kafkaTopic")
.option("startingOffsetsByTimestamp", 
"""\{"source_JSON_sgattu_primittive":{"0":1648013731709,"5":1648013731709,"4":1648013731709,"1":1648013731709,"2":1648013731709,"3":1648013731709}}""")
.load()



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to