Thank you, Souvik.

Dongjoon.

On Thu, Apr 7, 2022 at 10:59 AM Paul, Souvik <souvik.p...@gs.com> wrote:

> Hi Dongjoon,
>
>
>
> Raised the JIRA at https://issues.apache.org/jira/browse/SPARK-38824
>
>
>
> Thanks,
>
> Souvik
>
>
>
> *From:* Dongjoon Hyun <dongjoon.h...@gmail.com>
> *Sent:* Wednesday, March 30, 2022 4:44 AM
> *To:* Paul, Souvik [Engineering] <souvik.p...@ny.email.gs.com>
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Probable bug in async commit of Kafka offset in
> DirectKafkaInputDStream
>
>
>
> Hi, Souvik
>
>
>
> Could you file a JIRA issue for that?
>
>
>
> Thanks,
>
> Dongjoon
>
>
>
> On Thu, Mar 24, 2022 at 11:08 AM Paul, Souvik <souvik.p...@gs.com> wrote:
>
> Hi Dev,
>
> I added a few debug statements at the following lines and found few issues.
>
> 1. At line 254 of override def compute(validTime: Time):
> Option[KafkaRDD[K, V]] in DirectKafkaInputDStream.scala:
>
>         System.out.print("Called commitAll at time " + validTime + " " +
>                 commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
>
> 2. At line 454 of test("offset recovery from kafka") in
> DirectKafkaStreamSuite.scala:
>
>         print("Called commitAsync at " + time +  " " +
> offsets.mkString("Array(", ", ", ")") + "\n")
>
>
> This shows that the commitAll call is not properly handled. Since, it is
> called inside compute function. There is a chance that during last RDD, we
> will miss the last offset. In the current example we have missed the offset
> commit of range 8->10.
>
> Can someone confirm if this is a design choice or a bug?
>
> The current log is something like this.
>
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064200 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064300 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064400 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]))
> Called commitAsync at 1645548064500 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064600 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAll at time 1645548064700 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064700 ms Array(OffsetRange(topic:
> 'recoveryfromkafka', partition: 0, range: [8 -> 10]))
>
> Regards,
>
> Souvik Paul
> GitHub: @paulsouri
>
> ________________________________
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices<
> http://www.gs.com/privacy-notices>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Reply via email to