Thank you, Souvik. Dongjoon.
On Thu, Apr 7, 2022 at 10:59 AM Paul, Souvik <souvik.p...@gs.com> wrote: > Hi Dongjoon, > > > > Raised the JIRA at https://issues.apache.org/jira/browse/SPARK-38824 > > > > Thanks, > > Souvik > > > > *From:* Dongjoon Hyun <dongjoon.h...@gmail.com> > *Sent:* Wednesday, March 30, 2022 4:44 AM > *To:* Paul, Souvik [Engineering] <souvik.p...@ny.email.gs.com> > *Cc:* dev@spark.apache.org > *Subject:* Re: Probable bug in async commit of Kafka offset in > DirectKafkaInputDStream > > > > Hi, Souvik > > > > Could you file a JIRA issue for that? > > > > Thanks, > > Dongjoon > > > > On Thu, Mar 24, 2022 at 11:08 AM Paul, Souvik <souvik.p...@gs.com> wrote: > > Hi Dev, > > I added a few debug statements at the following lines and found few issues. > > 1. At line 254 of override def compute(validTime: Time): > Option[KafkaRDD[K, V]] in DirectKafkaInputDStream.scala: > > System.out.print("Called commitAll at time " + validTime + " " + > commitQueue.toArray.mkString("Array(", ", ", ")") + "\n") > > 2. At line 454 of test("offset recovery from kafka") in > DirectKafkaStreamSuite.scala: > > print("Called commitAsync at " + time + " " + > offsets.mkString("Array(", ", ", ")") + "\n") > > > This shows that the commitAll call is not properly handled. Since, it is > called inside compute function. There is a chance that during last RDD, we > will miss the last offset. In the current example we have missed the offset > commit of range 8->10. > > Can someone confirm if this is a design choice or a bug? > > The current log is something like this. > > Called commitAll at time 1645548063100 ms Array() > Called commitAll at time 1645548063200 ms Array() > Called commitAll at time 1645548063300 ms Array() > Called commitAll at time 1645548063400 ms Array() > Called commitAll at time 1645548063500 ms Array() > Called commitAll at time 1645548063600 ms Array() > Called commitAll at time 1645548063700 ms Array() > Called commitAll at time 1645548063800 ms Array() > Called commitAll at time 1645548063900 ms Array() > Called commitAll at time 1645548064000 ms Array() > Called commitAll at time 1645548064100 ms Array() > Called commitAll at time 1645548064200 ms Array() > Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [0 -> 4])) > Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 4])) > Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 8])) > Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAll at time 1645548064700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 8])) > Called commitAsync at 1645548064700 ms Array(OffsetRange(topic: > 'recoveryfromkafka', partition: 0, range: [8 -> 10])) > > Regards, > > Souvik Paul > GitHub: @paulsouri > > ________________________________ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices< > http://www.gs.com/privacy-notices> > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > > ------------------------------ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices >