This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0965fa427a7 [SPARK-38181][SS][DOCS] Update comments in
KafkaDataConsumer.scala
0965fa427a7 is described below
commit 0965fa427a70d2855945e2008ccdb86a4989d763
Author: azheng <[email protected]>
AuthorDate: Tue Jun 7 21:27:07 2022 +0900
[SPARK-38181][SS][DOCS] Update comments in KafkaDataConsumer.scala
### What changes were proposed in this pull request?
Fixed some minor format issue in the code comments and rephrase some of
them to make it more clear
### Why are the changes needed?
Minor format correction and better readability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Not needed, no real code changes
Closes #35484 from ArvinZheng/SPARK-38181.
Authored-by: azheng <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/kafka010/consumer/KafkaDataConsumer.scala | 25 +++++++++++-----------
1 file changed, 12 insertions(+), 13 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 37fe38ea94e..d88e9821489 100644
---
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -267,20 +267,18 @@ private[kafka010] class KafkaDataConsumer(
* within [offset, untilOffset).
*
* This method also will try its best to detect data loss. If
`failOnDataLoss` is `true`, it will
- * throw an exception when we detect an unavailable offset. If
`failOnDataLoss` is `false`, this
- * method will try to fetch next available record within [offset,
untilOffset).
- *
- * When this method tries to skip offsets due to either invisible messages
or data loss and
- * reaches `untilOffset`, it will return `null`.
+ * throw an exception when it detects an unavailable offset. If
`failOnDataLoss` is `false`, this
+ * method will try to fetch next available record within [offset,
untilOffset). When this method
+ * reaches `untilOffset` and still can't find an available record, it will
return `null`.
*
* @param offset the offset to fetch.
* @param untilOffset the max offset to fetch. Exclusive.
* @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
* @param failOnDataLoss When `failOnDataLoss` is `true`, this method will
either return record at
- * offset if available, or throw exception.when
`failOnDataLoss` is `false`,
- * this method will either return record at offset if
available, or return
- * the next earliest available record less than
untilOffset, or null. It
- * will not throw any exception.
+ * offset if available, or throw an exception. When
`failOnDataLoss` is
+ * `false`, this method will return record at offset
if available, or return
+ * the record at the next earliest available offset
that is less than
+ * untilOffset, otherwise null.
*/
def get(
offset: Long,
@@ -298,9 +296,10 @@ private[kafka010] class KafkaDataConsumer(
s"requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When
`failOnDataLoss` is
- // `false`, first, we will try to fetch the record at `offset`. If no such
record exists, then
- // we will move to the next available offset within `[offset,
untilOffset)` and retry.
- // If `failOnDataLoss` is `true`, the loop body will be executed only once.
+ // `false`, we will try to fetch the record at `offset`, if the record
does not exist, we will
+ // try to fetch next available record within [offset, untilOffset).
+ // If `failOnDataLoss` is `true`, the loop body will be executed only
once, either return the
+ // record at `offset` or throw an exception when the record does not exist.
var toFetchOffset = offset
var fetchedRecord: FetchedRecord = null
// We want to break out of the while loop on a successful fetch to avoid
using "return"
@@ -452,7 +451,7 @@ private[kafka010] class KafkaDataConsumer(
/**
* Get the fetched record for the given offset if available.
*
- * If the record is invisible (either a transaction message, or an aborted
message when the
+ * If the record is invisible (either a transaction message, or an aborted
message when the
* consumer's `isolation.level` is `read_committed`), it will return a
`FetchedRecord` with the
* next offset to fetch.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]