This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9fb3760  [SPARK-32044][SS][2.4] Kakfa continuous processing print 
mislead initial offset
9fb3760 is described below

commit 9fb3760f8e73feb1c7b99255ce6bdc5abd1a5768
Author: Warren Zhu <[email protected]>
AuthorDate: Mon Jun 22 15:19:53 2020 -0700

    [SPARK-32044][SS][2.4] Kakfa continuous processing print mislead initial 
offset
    
    ### What changes were proposed in this pull request?
    
    Use `java.util.Optional.orElseGet` instead of `java.util.Optional.orElse` 
to fix unnecessary kafka offset fetch and misleading info log. In Java, 
`orElseGet` uses lazy evaluation while `orElse` always evaluate the expression.
    
    ### Why are the changes needed?
    
    Fix mislead initial offsets log and unnecessary kafka offset fetch
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Currently, no test for KafkaContinuousReader. Also it's hard to test log.
    
    Closes #28887 from warrenzhu25/SPARK-32044.
    
    Authored-by: Warren Zhu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/kafka010/KafkaContinuousReader.scala    | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 561d501..0fdb44d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
 import java.util.concurrent.TimeoutException
+import java.util.function.Supplier
 
 import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
@@ -70,15 +71,17 @@ class KafkaContinuousReader(
 
   private var offset: Offset = _
   override def setStartOffset(start: ju.Optional[Offset]): Unit = {
-    offset = start.orElse {
-      val offsets = initialOffsets match {
-        case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
-        case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+    offset = start.orElseGet(new Supplier[Offset] {
+      override def get(): Offset = {
+        val offsets = initialOffsets match {
+          case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+          case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
+          case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+        }
+        logInfo(s"Initial offsets: $offsets")
+        offsets
       }
-      logInfo(s"Initial offsets: $offsets")
-      offsets
-    }
+    })
   }
 
   override def getStartOffset(): Offset = offset


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to