[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726970#comment-16726970
 ] 

ASF GitHub Bot commented on SPARK-26267:
----------------------------------------

zsxwing closed pull request #23324: [SPARK-26267][SS]Retry when detecting 
incorrect offsets from Kafka
URL: https://github.com/apache/spark/pull/23324
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 1753a28fba2fb..02dfb9ca2b95a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
   override def initialOffset(): Offset = {
     val offsets = initialOffsets match {
       case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-      case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+      case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
       case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, 
reportDataLoss)
     }
     logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
 
   override def needsReconfiguration(config: ScanConfig): Boolean = {
     val knownPartitions = 
config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-    offsetReader.fetchLatestOffsets().keySet != knownPartitions
+    offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index bb4de674c3c72..b4f042e93a5da 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 
   override def latestOffset(start: Offset): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-    val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+    val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
     endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
       rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
     }.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
     }.toSeq
     logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+    val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+    val untilOffsets = endPartitionOffsets
+    untilOffsets.foreach { case (tp, untilOffset) =>
+      fromOffsets.get(tp).foreach { fromOffset =>
+        if (untilOffset < fromOffset) {
+          reportDataLoss(s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed")
+        }
+      }
+    }
+
     // Calculate offset ranges
     val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-      untilOffsets = endPartitionOffsets,
+      fromOffsets = fromOffsets,
+      untilOffsets = untilOffsets,
       executorLocations = getSortedExecutorList())
 
     // Reuse Kafka consumers only when all the offset ranges have distinct 
TopicPartitions,
@@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
         case EarliestOffsetRangeLimit =>
           KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
         case LatestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) =>
           kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
       }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c724afba..6008794924052 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
    * the read tasks of the skewed partitions to multiple Spark tasks.
    * The number of Spark tasks will be *approximately* `numPartitions`. It can 
be less or more
    * depending on rounding errors or Kafka partitions that didn't receive any 
new data.
+   *
+   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
    */
   def getRanges(
       fromOffsets: PartitionOffsetMap,
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 82066697cb95a..fc443d22bf5a2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
 import java.util.concurrent.{Executors, ThreadFactory}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
         // Poll to get the latest assigned partitions
         consumer.poll(0)
         val partitions = consumer.assignment()
+
+        // Call `position` to wait until the potential offset request 
triggered by `poll(0)` is
+        // done. This is a workaround for KAFKA-7703, which an async 
`seekToBeginning` triggered by
+        // `poll(0)` may reset offsets that should have been set by another 
request.
+        partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
         consumer.pause(partitions)
         assert(partitions.asScala == partitionOffsets.keySet,
           "If startingOffsets contains specific offsets, you must specify all 
TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
   /**
    * Fetch the latest offsets for the topic partitions that are indicated
    * in the [[ConsumerStrategy]].
+   *
+   * Kafka may return earliest offsets when we are requesting latest offsets 
if `poll` is called
+   * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call 
`position` right after
+   * `poll` to wait until the potential offset request triggered by `poll(0)` 
is done.
+   *
+   * In addition, to avoid other unknown issues, we also use the given 
`knownOffsets` to audit the
+   * latest offsets returned by Kafka. If we find some incorrect offsets (a 
latest offset is less
+   * than an offset in `knownOffsets`), we will retry at most 
`maxOffsetFetchAttempts` times. When
+   * a topic is recreated, the latest offsets may be less than offsets in 
`knownOffsets`. We cannot
+   * distinguish this with KAFKA-7703, so we just return whatever we get from 
Kafka after retrying.
    */
-  def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+  def fetchLatestOffsets(
+      knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = 
runUninterruptibly {
     withRetriesWithoutInterrupt {
       // Poll to get the latest assigned partitions
       consumer.poll(0)
       val partitions = consumer.assignment()
+
+      // Call `position` to wait until the potential offset request triggered 
by `poll(0)` is
+      // done. This is a workaround for KAFKA-7703, which an async 
`seekToBeginning` triggered by
+      // `poll(0)` may reset offsets that should have been set by another 
request.
+      partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
       consumer.pause(partitions)
       logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the 
end.")
 
-      consumer.seekToEnd(partitions)
-      val partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
-      logDebug(s"Got latest offsets for partition : $partitionOffsets")
-      partitionOffsets
+      if (knownOffsets.isEmpty) {
+        consumer.seekToEnd(partitions)
+        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+      } else {
+        var partitionOffsets: PartitionOffsetMap = Map.empty
+
+        /**
+         * Compare `knownOffsets` and `partitionOffsets`. Returns all 
partitions that have incorrect
+         * latest offset (offset in `knownOffsets` is great than the one in 
`partitionOffsets`).
+         */
+        def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
+          var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+          partitionOffsets.foreach { case (tp, offset) =>
+            knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+              if (knownOffset > offset) {
+                val incorrectOffset = (tp, knownOffset, offset)
+                incorrectOffsets += incorrectOffset
+              }
+            })
+          }
+          incorrectOffsets
+        }
+
+        // Retry to fetch latest offsets when detecting incorrect offsets. We 
don't use
+        // `withRetriesWithoutInterrupt` to retry because:
+        //
+        // - `withRetriesWithoutInterrupt` will reset the consumer for each 
attempt but a fresh
+        //    consumer has a much bigger chance to hit KAFKA-7703.
+        // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+        var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+        var attempt = 0
+        do {
+          consumer.seekToEnd(partitions)
+          partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
+          attempt += 1
+
+          incorrectOffsets = findIncorrectOffsets()
+          if (incorrectOffsets.nonEmpty) {
+            logWarning("Found incorrect offsets in some partitions " +
+              s"(partition, previous offset, fetched offset): 
$incorrectOffsets")
+            if (attempt < maxOffsetFetchAttempts) {
+              logWarning("Retrying to fetch latest offsets because of 
incorrect offsets")
+              Thread.sleep(offsetFetchAttemptIntervalMs)
+            }
+          }
+        } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
+
+        logDebug(s"Got latest offsets for partition : $partitionOffsets")
+        partitionOffsets
+      }
     }
   }
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 66ec7e0cd084a..d65b3cea632c4 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
     metadataLog.get(0).getOrElse {
       val offsets = startingOffsets match {
         case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+        case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) => 
kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
       }
       metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
-    val latest = kafkaReader.fetchLatestOffsets()
+    val latest = kafkaReader.fetchLatestOffsets(
+      currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
     val offsets = maxOffsetsPerTrigger match {
       case None =>
         latest
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5ee76990b54f4..61cbb3285a4f0 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -329,6 +329,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     )
   }
 
+  test("subscribe topic by pattern with topic recreation between batches") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-good"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, Array("1", "3"))
+    testUtils.createTopic(topic2, partitions = 1)
+    testUtils.sendMessages(topic2, Array("2", "4"))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
+      .option("startingOffsets", "earliest")
+      .option("subscribePattern", s"$topicPrefix-.*")
+
+    val ds = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+      .map(kv => kv._2.toInt)
+
+    testStream(ds)(
+      StartStream(),
+      AssertOnQuery { q =>
+        q.processAllAvailable()
+        true
+      },
+      CheckAnswer(1, 2, 3, 4),
+      // Restart the stream in this test to make the test stable. When 
recreating a topic when a
+      // consumer is alive, it may not be able to see the recreated topic even 
if a fresh consumer
+      // has seen it.
+      StopStream,
+      // Recreate `topic2` and wait until it's available
+      WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
+        testUtils.deleteTopic(topic2)
+        testUtils.createTopic(topic2)
+        testUtils.sendMessages(topic2, Array("6"))
+      },
+      StartStream(),
+      ExpectFailure[IllegalStateException](e => {
+        // The offset of `topic2` should be changed from 2 to 1
+        assert(e.getMessage.contains("was changed from 2 to 1"))
+      })
+    )
+  }
+
   test("ensure that initial offset are written with an extra byte in the 
beginning (SPARK-19517)") {
     withTempDir { metadataPath =>
       val topic = "kafka-initial-offset-current"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka source may reprocess data
> -------------------------------
>
>                 Key: SPARK-26267
>                 URL: https://issues.apache.org/jira/browse/SPARK-26267
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Shixiong Zhu
>            Assignee: Shixiong Zhu
>            Priority: Blocker
>              Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to