Repository: spark
Updated Branches:
  refs/heads/master 84f149e41 -> 08eac3560


[SPARK-17834][SQL] Fetch the earliest offsets manually in KafkaSource instead 
of counting on KafkaConsumer

## What changes were proposed in this pull request?

Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just 
calls `seekToBeginning` to manually set the earliest offsets for the 
KafkaSource initial offsets.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #15397 from zsxwing/SPARK-17834.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08eac356
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08eac356
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08eac356

Branch: refs/heads/master
Commit: 08eac356095c7faa2b19d52f2fb0cbc47eb7d1d1
Parents: 84f149e
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Oct 13 13:31:50 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Oct 13 13:31:50 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala | 55 +++++++++++++-------
 .../sql/kafka010/KafkaSourceProvider.scala      | 19 ++++---
 2 files changed, 48 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08eac356/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1be70db..4b0bb0a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource(
     executorKafkaParams: ju.Map[String, Object],
     sourceOptions: Map[String, String],
     metadataPath: String,
+    startFromEarliestOffset: Boolean,
     failOnDataLoss: Boolean)
   extends Source with Logging {
 
@@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource(
   private lazy val initialPartitionOffsets = {
     val metadataLog = new 
HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
     metadataLog.get(0).getOrElse {
-      val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false))
+      val offsets = if (startFromEarliestOffset) {
+        KafkaSourceOffset(fetchEarliestOffsets())
+      } else {
+        KafkaSourceOffset(fetchLatestOffsets())
+      }
       metadataLog.add(0, offsets)
       logInfo(s"Initial offsets: $offsets")
       offsets
@@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource(
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
-    val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
+    val offset = KafkaSourceOffset(fetchLatestOffsets())
     logDebug(s"GetOffset: 
${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
     Some(offset)
   }
@@ -227,26 +232,34 @@ private[kafka010] case class KafkaSource(
   override def toString(): String = s"KafkaSource[$consumerStrategy]"
 
   /**
-   * Fetch the offset of a partition, either seek to the latest offsets or use 
the current offsets
-   * in the consumer.
+   * Fetch the earliest offsets of partitions.
    */
-  private def fetchPartitionOffsets(
-      seekToEnd: Boolean): Map[TopicPartition, Long] = 
withRetriesWithoutInterrupt {
-    // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
-    assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+  private def fetchEarliestOffsets(): Map[TopicPartition, Long] = 
withRetriesWithoutInterrupt {
     // Poll to get the latest assigned partitions
     consumer.poll(0)
     val partitions = consumer.assignment()
     consumer.pause(partitions)
-    logDebug(s"Partitioned assigned to consumer: $partitions")
+    logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the 
beginning")
 
-    // Get the current or latest offset of each partition
-    if (seekToEnd) {
-      consumer.seekToEnd(partitions)
-      logDebug("Seeked to the end")
-    }
+    consumer.seekToBeginning(partitions)
+    val partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
+    logDebug(s"Got earliest offsets for partition : $partitionOffsets")
+    partitionOffsets
+  }
+
+  /**
+   * Fetch the latest offset of partitions.
+   */
+  private def fetchLatestOffsets(): Map[TopicPartition, Long] = 
withRetriesWithoutInterrupt {
+    // Poll to get the latest assigned partitions
+    consumer.poll(0)
+    val partitions = consumer.assignment()
+    consumer.pause(partitions)
+    logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the 
end.")
+
+    consumer.seekToEnd(partitions)
     val partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
-    logDebug(s"Got offsets for partition : $partitionOffsets")
+    logDebug(s"Got latest offsets for partition : $partitionOffsets")
     partitionOffsets
   }
 
@@ -256,22 +269,21 @@ private[kafka010] case class KafkaSource(
    */
   private def fetchNewPartitionEarliestOffsets(
       newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = 
withRetriesWithoutInterrupt {
-    // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
-    assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
     // Poll to get the latest assigned partitions
     consumer.poll(0)
     val partitions = consumer.assignment()
+    consumer.pause(partitions)
     logDebug(s"\tPartitioned assigned to consumer: $partitions")
 
     // Get the earliest offset of each partition
     consumer.seekToBeginning(partitions)
-    val partitionToOffsets = newPartitions.filter { p =>
+    val partitionOffsets = newPartitions.filter { p =>
       // When deleting topics happen at the same time, some partitions may not 
be in `partitions`.
       // So we need to ignore them
       partitions.contains(p)
     }.map(p => p -> consumer.position(p)).toMap
-    logDebug(s"Got offsets for new partitions: $partitionToOffsets")
-    partitionToOffsets
+    logDebug(s"Got earliest offsets for new partitions: $partitionOffsets")
+    partitionOffsets
   }
 
   /**
@@ -284,6 +296,9 @@ private[kafka010] case class KafkaSource(
    */
   private def withRetriesWithoutInterrupt(
       body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+    // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
+    assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
+
     synchronized {
       var result: Option[Map[TopicPartition, Long]] = None
       var attempt = 1

http://git-wip-us.apache.org/repos/asf/spark/blob/08eac356/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 1b0a2fe..23b1b60 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
     // id. Hence, we should generate a unique id for each query.
     val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
 
-    val autoOffsetResetValue = 
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match {
-      case Some(value) => value.trim()  // same values as those supported by 
auto.offset.reset
-      case None => "latest"
-    }
+    val startFromEarliestOffset =
+      
caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) 
match {
+        case Some("latest") => false
+        case Some("earliest") => true
+        case Some(pos) =>
+          // This should not happen since we have already checked the options.
+          throw new IllegalStateException(s"Invalid 
$STARTING_OFFSET_OPTION_KEY: $pos")
+        case None => false
+      }
 
     val kafkaParamsForStrategy =
       ConfigUpdater("source", specifiedKafkaParams)
@@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
         // So that consumers in Kafka source do not mess with any existing 
group id
         .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
 
-        // So that consumers can start from earliest or latest
-        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue)
+        // Set to "latest" to avoid exceptions. However, KafkaSource will 
fetch the initial offsets
+        // by itself instead of counting on KafkaConsumer.
+        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
 
         // So that consumers in the driver does not commit offsets 
unnecessarily
         .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
       kafkaParamsForExecutors,
       parameters,
       metadataPath,
+      startFromEarliestOffset,
       failOnDataLoss)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to