Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r212521083
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer(
       }
     
       /**
    -   * Get the record for the given offset if available. Otherwise it will 
either throw error
    -   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
    -   * or null.
    +   * Get the fetched record for the given offset if available.
    +   *
    +   * If the record is invisible (either a  transaction message, or an 
aborted message when the
    +   * consumer's `isolation.level` is `read_committed`), it will return a 
`FetchedRecord` with the
    +   * next offset to fetch.
    +   *
    +   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is true`, it will
    +   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
    +   * method will return `null` if the next available record is within 
[offset, untilOffset).
        *
        * @throws OffsetOutOfRangeException if `offset` is out of range
        * @throws TimeoutException if cannot fetch the record in 
`pollTimeoutMs` milliseconds.
        */
    -  private def fetchData(
    +  private def fetchRecord(
           offset: Long,
           untilOffset: Long,
           pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
    -    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    -      // This is the first fetch, or the last pre-fetched data has been 
drained.
    -      // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
    -      seek(offset)
    -      poll(pollTimeoutMs)
    -    }
    -
    -    if (!fetchedData.hasNext()) {
    -      // We cannot fetch anything after `poll`. Two possible cases:
    -      // - `offset` is out of range so that Kafka returns nothing. Just 
throw
    -      // `OffsetOutOfRangeException` to let the caller handle it.
    -      // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
    -      val range = getAvailableOffsetRange()
    -      if (offset < range.earliest || offset >= range.latest) {
    -        throw new OffsetOutOfRangeException(
    -          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      failOnDataLoss: Boolean): FetchedRecord = {
    +    if (offset != fetchedData.nextOffsetInFetchedData) {
    +      // This is the first fetch, or the fetched data has been reset.
    +      // Fetch records from Kafka and update `fetchedData`.
    +      fetchData(offset, pollTimeoutMs)
    +    } else if (!fetchedData.hasNext) { // The last pre-fetched data has 
been drained.
    +      if (offset < fetchedData.offsetAfterPoll) {
    +        // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
    +        // the next call to start from `fetchedData.offsetAfterPoll`.
    +        fetchedData.reset()
    +        return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
           } else {
    -        throw new TimeoutException(
    -          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
    +        // Fetch records from Kafka and update `fetchedData`.
    +        fetchData(offset, pollTimeoutMs)
           }
    +    }
    +
    +    if (!fetchedData.hasNext) {
    +      // When we reach here, we have already tried to poll from Kafka. As 
`fetchedData` is still
    +      // empty, all messages in [offset, fetchedData.offsetAfterPoll) are 
invisible. Return a
    +      // record to ask the next call to start from 
`fetchedData.offsetAfterPoll`.
    +      assert(offset <= fetchedData.offsetAfterPoll,
    +        s"seek to $offset and poll but the offset was reset to 
${fetchedData.offsetAfterPoll}")
    +      fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
         } else {
           val record = fetchedData.next()
    -      nextOffsetInFetchedData = record.offset + 1
           // In general, Kafka uses the specified offset as the start point, 
and tries to fetch the next
           // available offset. Hence we need to handle offset mismatch.
           if (record.offset > offset) {
    +        val range = getAvailableOffsetRange()
    +        if (range.earliest <= offset) {
    +          // `offset` is still valid but the corresponding message is 
invisible. We should skip it
    +          // and jump to `record.offset`. Here we move `fetchedData` back 
so that the next call of
    +          // `fetchRecord` can just return `record` directly.
    +          fetchedData.previous()
    +          return fetchedRecord.withRecord(null, record.offset)
    +        }
             // This may happen when some records aged out but their offsets 
already got verified
             if (failOnDataLoss) {
               reportDataLoss(true, s"Cannot fetch records in [$offset, 
${record.offset})")
               // Never happen as "reportDataLoss" will throw an exception
    -          null
    +          throw new IllegalStateException(
    +            "reportDataLoss didn't throw an exception when 
'failOnDataLoss' is true")
             } else {
               if (record.offset >= untilOffset) {
                 reportDataLoss(false, s"Skip missing records in [$offset, 
$untilOffset)")
    -            null
    +            // Set `nextOffsetToFetch` to `untilOffset` to finish the 
current batch.
    +            fetchedRecord.withRecord(null, untilOffset)
               } else {
                 reportDataLoss(false, s"Skip missing records in [$offset, 
${record.offset})")
    -            record
    +            fetchedRecord.withRecord(record, 
fetchedData.nextOffsetInFetchedData)
               }
    --- End diff --
    
    nit: This can be unnested. 
    if  ... else { if ... else ... } -> if ... else if .. else 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to