Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212521083 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the fetched record for the given offset if available. + * + * If the record is invisible (either a transaction message, or an aborted message when the + * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the + * next offset to fetch. + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this + * method will return `null` if the next available record is within [offset, untilOffset). * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ - private def fetchData( + private def fetchRecord( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { - if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) - } - - if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { - throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + failOnDataLoss: Boolean): FetchedRecord = { + if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) + } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. + if (offset < fetchedData.offsetAfterPoll) { + // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask + // the next call to start from `fetchedData.offsetAfterPoll`. + fetchedData.reset() + return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { - throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) } + } + + if (!fetchedData.hasNext) { + // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still + // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a + // record to ask the next call to start from `fetchedData.offsetAfterPoll`. + assert(offset <= fetchedData.offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}") + fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { val record = fetchedData.next() - nextOffsetInFetchedData = record.offset + 1 // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { + val range = getAvailableOffsetRange() + if (range.earliest <= offset) { + // `offset` is still valid but the corresponding message is invisible. We should skip it + // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of + // `fetchRecord` can just return `record` directly. + fetchedData.previous() + return fetchedRecord.withRecord(null, record.offset) + } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") // Never happen as "reportDataLoss" will throw an exception - null + throw new IllegalStateException( + "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true") } else { if (record.offset >= untilOffset) { reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") - null + // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. + fetchedRecord.withRecord(null, untilOffset) } else { reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - record + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } --- End diff -- nit: This can be unnested. if ... else { if ... else ... } -> if ... else if .. else
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org