This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 9fb3760 [SPARK-32044][SS][2.4] Kakfa continuous processing print
mislead initial offset
9fb3760 is described below
commit 9fb3760f8e73feb1c7b99255ce6bdc5abd1a5768
Author: Warren Zhu <[email protected]>
AuthorDate: Mon Jun 22 15:19:53 2020 -0700
[SPARK-32044][SS][2.4] Kakfa continuous processing print mislead initial
offset
### What changes were proposed in this pull request?
Use `java.util.Optional.orElseGet` instead of `java.util.Optional.orElse`
to fix unnecessary kafka offset fetch and misleading info log. In Java,
`orElseGet` uses lazy evaluation while `orElse` always evaluate the expression.
### Why are the changes needed?
Fix mislead initial offsets log and unnecessary kafka offset fetch
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Currently, no test for KafkaContinuousReader. Also it's hard to test log.
Closes #28887 from warrenzhu25/SPARK-32044.
Authored-by: Warren Zhu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/kafka010/KafkaContinuousReader.scala | 19 +++++++++++--------
1 file changed, 11 insertions(+), 8 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 561d501..0fdb44d 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.TimeoutException
+import java.util.function.Supplier
import org.apache.kafka.clients.consumer.{ConsumerRecord,
OffsetOutOfRangeException}
import org.apache.kafka.common.TopicPartition
@@ -70,15 +71,17 @@ class KafkaContinuousReader(
private var offset: Offset = _
override def setStartOffset(start: ju.Optional[Offset]): Unit = {
- offset = start.orElse {
- val offsets = initialOffsets match {
- case EarliestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
- case SpecificOffsetRangeLimit(p) =>
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ offset = start.orElseGet(new Supplier[Offset] {
+ override def get(): Offset = {
+ val offsets = initialOffsets match {
+ case EarliestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+ case LatestOffsetRangeLimit =>
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
+ case SpecificOffsetRangeLimit(p) =>
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+ }
+ logInfo(s"Initial offsets: $offsets")
+ offsets
}
- logInfo(s"Initial offsets: $offsets")
- offsets
- }
+ })
}
override def getStartOffset(): Offset = offset
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]