Repository: spark
Updated Branches:
  refs/heads/master c2131c0cd -> 658687b25


[SPARK-4964] [Streaming] refactor createRDD to take leaders via map instead of 
array

Author: cody koeninger <c...@koeninger.org>

Closes #4511 from koeninger/kafkaRdd-leader-to-broker and squashes the 
following commits:

f7151d4 [cody koeninger] [SPARK-4964] test refactoring
6f8680b [cody koeninger] [SPARK-4964] add test of the scala api for 
KafkaUtils.createRDD
f81e016 [cody koeninger] [SPARK-4964] leave KafkaStreamSuite host and port as 
private
5173f3f [cody koeninger] [SPARK-4964] test the Java variations of createRDD
e9cece4 [cody koeninger] [SPARK-4964] pass leaders as a map to ensure 1 leader 
per TopicPartition


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658687b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658687b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658687b2

Branch: refs/heads/master
Commit: 658687b25491047f30ee8558733d11e5a0572070
Parents: c2131c0
Author: cody koeninger <c...@koeninger.org>
Authored: Wed Feb 11 00:13:27 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Feb 11 00:13:27 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/kafka/Broker.scala   |  68 ++++++++
 .../spark/streaming/kafka/KafkaUtils.scala      |  44 ++++--
 .../apache/spark/streaming/kafka/Leader.scala   |  57 -------
 .../streaming/kafka/JavaKafkaRDDSuite.java      | 156 +++++++++++++++++++
 .../spark/streaming/kafka/KafkaRDDSuite.scala   |  96 ++++++++----
 5 files changed, 321 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/658687b2/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
new file mode 100644
index 0000000..5a74feb
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represent the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID
+ */
+@Experimental
+final class Broker private(
+    /** Broker's hostname */
+    val host: String,
+    /** Broker's port */
+    val port: Int) extends Serializable {
+  override def equals(obj: Any): Boolean = obj match {
+    case that: Broker =>
+      this.host == that.host &&
+      this.port == that.port
+    case _ => false
+  }
+
+  override def hashCode: Int = {
+    41 * (41 + host.hashCode) + port
+  }
+
+  override def toString(): String = {
+    s"Broker($host, $port)"
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object that provides methods to create instances of [[Broker]].
+ */
+@Experimental
+object Broker {
+  def create(host: String, port: Int): Broker =
+    new Broker(host, port)
+
+  def apply(host: String, port: Int): Broker =
+    new Broker(host, port)
+
+  def unapply(broker: Broker): Option[(String, Int)] = {
+    if (broker == null) {
+      None
+    } else {
+      Some((broker.host, broker.port))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/658687b2/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 7a2c3ab..af04bc6 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -154,6 +154,19 @@ object KafkaUtils {
       jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: 
_*), storageLevel)
   }
 
+  /** get leaders for the given offset ranges, or throw an exception */
+  private def leadersForRanges(
+      kafkaParams: Map[String, String],
+      offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] 
= {
+    val kc = new KafkaCluster(kafkaParams)
+    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
+    val leaders = kc.findLeaders(topics).fold(
+      errs => throw new SparkException(errs.mkString("\n")),
+      ok => ok
+    )
+    leaders
+  }
+
   /**
    * Create a RDD from Kafka using offset ranges for each topic and partition.
    *
@@ -176,12 +189,7 @@ object KafkaUtils {
       offsetRanges: Array[OffsetRange]
     ): RDD[(K, V)] = {
     val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
-    val kc = new KafkaCluster(kafkaParams)
-    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-    val leaders = kc.findLeaders(topics).fold(
-      errs => throw new SparkException(errs.mkString("\n")),
-      ok => ok
-    )
+    val leaders = leadersForRanges(kafkaParams, offsetRanges)
     new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
   }
 
@@ -198,7 +206,8 @@ object KafkaUtils {
    *    host1:port1,host2:port2 form.
    * @param offsetRanges Each OffsetRange in the batch corresponds to a
    *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka leaders for each offset range in batch
+   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
+   *   in which case leaders will be looked up on the driver.
    * @param messageHandler Function for translating each message and metadata 
into the desired type
    */
   @Experimental
@@ -211,12 +220,17 @@ object KafkaUtils {
       sc: SparkContext,
       kafkaParams: Map[String, String],
       offsetRanges: Array[OffsetRange],
-      leaders: Array[Leader],
+      leaders: Map[TopicAndPartition, Broker],
       messageHandler: MessageAndMetadata[K, V] => R
     ): RDD[R] = {
-    val leaderMap = leaders
-      .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
-      .toMap
+    val leaderMap = if (leaders.isEmpty) {
+      leadersForRanges(kafkaParams, offsetRanges)
+    } else {
+      // This could be avoided by refactoring KafkaRDD.leaders and 
KafkaCluster to use Broker
+      leaders.map {
+        case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+      }.toMap
+    }
     new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
   }
 
@@ -263,7 +277,8 @@ object KafkaUtils {
    *    host1:port1,host2:port2 form.
    * @param offsetRanges Each OffsetRange in the batch corresponds to a
    *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka leaders for each offset range in batch
+   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
+   *   in which case leaders will be looked up on the driver.
    * @param messageHandler Function for translating each message and metadata 
into the desired type
    */
   @Experimental
@@ -276,7 +291,7 @@ object KafkaUtils {
       recordClass: Class[R],
       kafkaParams: JMap[String, String],
       offsetRanges: Array[OffsetRange],
-      leaders: Array[Leader],
+      leaders: JMap[TopicAndPartition, Broker],
       messageHandler: JFunction[MessageAndMetadata[K, V], R]
     ): JavaRDD[R] = {
     implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
@@ -284,8 +299,9 @@ object KafkaUtils {
     implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
     implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
     implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    val leaderMap = Map(leaders.toSeq: _*)
     createRDD[K, V, KD, VD, R](
-      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaders, 
messageHandler.call _)
+      jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, 
messageHandler.call _)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/658687b2/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
deleted file mode 100644
index c129a26..0000000
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import kafka.common.TopicAndPartition
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * Represent the host info for the leader of a Kafka partition.
- */
-@Experimental
-final class Leader private(
-    /** Kafka topic name */
-    val topic: String,
-    /** Kafka partition id */
-    val partition: Int,
-    /** Leader's hostname */
-    val host: String,
-    /** Leader's port */
-    val port: Int) extends Serializable
-
-/**
- * :: Experimental ::
- * Companion object the provides methods to create instances of [[Leader]].
- */
-@Experimental
-object Leader {
-  def create(topic: String, partition: Int, host: String, port: Int): Leader =
-    new Leader(topic, partition, host, port)
-
-  def create(topicAndPartition: TopicAndPartition, host: String, port: Int): 
Leader =
-    new Leader(topicAndPartition.topic, topicAndPartition.partition, host, 
port)
-
-  def apply(topic: String, partition: Int, host: String, port: Int): Leader =
-    new Leader(topic, partition, host, port)
-
-  def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): 
Leader =
-    new Leader(topicAndPartition.topic, topicAndPartition.partition, host, 
port)
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/658687b2/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000..9d2e170
--- /dev/null
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.apache.spark.SparkConf;
+
+import scala.Tuple2;
+
+import junit.framework.Assert;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.junit.Test;
+import org.junit.After;
+import org.junit.Before;
+
+public class JavaKafkaRDDSuite implements Serializable {
+  private transient JavaSparkContext sc = null;
+  private transient KafkaStreamSuiteBase suiteBase = null;
+
+  @Before
+  public void setUp() {
+    suiteBase = new KafkaStreamSuiteBase() { };
+    suiteBase.setupKafka();
+    System.clearProperty("spark.driver.port");
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    sc = new JavaSparkContext(sparkConf);
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+    System.clearProperty("spark.driver.port");
+    suiteBase.tearDownKafka();
+  }
+
+  @Test
+  public void testKafkaRDD() throws InterruptedException {
+    String topic1 = "topic1";
+    String topic2 = "topic2";
+
+    String[] topic1data = createTopicAndSendData(topic1);
+    String[] topic2data = createTopicAndSendData(topic2);
+
+    HashMap<String, String> kafkaParams = new HashMap<String, String>();
+    kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
+
+    OffsetRange[] offsetRanges = {
+      OffsetRange.create(topic1, 0, 0, 1),
+      OffsetRange.create(topic2, 0, 0, 1)
+    };
+
+    HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap();
+    HashMap<TopicAndPartition, Broker> leaders = new HashMap();
+    String[] hostAndPort = suiteBase.brokerAddress().split(":");
+    Broker broker = Broker.create(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
+    leaders.put(new TopicAndPartition(topic1, 0), broker);
+    leaders.put(new TopicAndPartition(topic2, 0), broker);
+
+    JavaRDD<String> rdd1 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        offsetRanges
+    ).map(
+        new Function<Tuple2<String, String>, String>() {
+          @Override
+          public String call(scala.Tuple2<String, String> kv) throws Exception 
{
+            return kv._2();
+          }
+        }
+    );
+
+    JavaRDD<String> rdd2 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        offsetRanges,
+        emptyLeaders,
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) 
throws Exception {
+            return msgAndMd.message();
+          }
+        }
+    );
+
+    JavaRDD<String> rdd3 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        offsetRanges,
+        leaders,
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) 
throws Exception {
+            return msgAndMd.message();
+          }
+        }
+    );
+
+    // just making sure the java user apis work; the scala tests handle logic 
corner cases
+    long count1 = rdd1.count();
+    long count2 = rdd2.count();
+    long count3 = rdd3.count();
+    Assert.assertTrue(count1 > 0);
+    Assert.assertEquals(count1, count2);
+    Assert.assertEquals(count1, count3);
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    suiteBase.createTopic(topic);
+    suiteBase.sendMessages(topic, data);
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/658687b2/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 6774db8..a223da7 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -21,18 +21,22 @@ import scala.util.Random
 
 import kafka.serializer.StringDecoder
 import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfter
+import kafka.message.MessageAndMetadata
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 
-class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
+  val sparkConf = new 
SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
   var sc: SparkContext = _
-  before {
+  override def beforeAll {
+    sc = new SparkContext(sparkConf)
+
     setupKafka()
   }
 
-  after {
+  override def afterAll {
     if (sc != null) {
       sc.stop
       sc = null
@@ -40,60 +44,94 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with 
BeforeAndAfter {
     tearDownKafka()
   }
 
-  test("Kafka RDD") {
-    val sparkConf = new 
SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
-    sc = new SparkContext(sparkConf)
+  test("basic usage") {
+    val topic = "topicbasic"
+    createTopic(topic)
+    val messages = Set("the", "quick", "brown", "fox")
+    sendMessages(topic, messages.toArray)
+
+
+    val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
+      "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+    val rdd =  KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
+      sc, kafkaParams, offsetRanges)
+
+    val received = rdd.map(_._2).collect.toSet
+    assert(received === messages)
+  }
+
+  test("iterator boundary conditions") {
+    // the idea is to find e.g. off-by-one errors between what kafka has 
available and the rdd
     val topic = "topic1"
     val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
     createTopic(topic)
-    sendMessages(topic, sent)
 
     val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
       "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
 
     val kc = new KafkaCluster(kafkaParams)
 
-    val rdd = getRdd(kc, Set(topic))
     // this is the "lots of messages" case
-    // make sure we get all of them
+    sendMessages(topic, sent)
+    // rdd defined from leaders after sending messages, should get the number 
sent
+    val rdd = getRdd(kc, Set(topic))
+
     assert(rdd.isDefined)
-    assert(rdd.get.count === sent.values.sum)
+    assert(rdd.get.count === sent.values.sum, "didn't get all sent messages")
 
-    kc.setConsumerOffsets(
-      kafkaParams("group.id"),
-      rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> 
o.untilOffset).toMap)
+    val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
+      .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> 
o.untilOffset).toMap
+
+    kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
 
-    val rdd2 = getRdd(kc, Set(topic))
-    val sent2 = Map("d" -> 1)
-    sendMessages(topic, sent2)
     // this is the "0 messages" case
-    // make sure we dont get anything, since messages were sent after rdd was 
defined
+    val rdd2 = getRdd(kc, Set(topic))
+    // shouldn't get anything, since message is sent after rdd was defined
+    val sentOnlyOne = Map("d" -> 1)
+
+    sendMessages(topic, sentOnlyOne)
     assert(rdd2.isDefined)
-    assert(rdd2.get.count === 0)
+    assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
 
+    // this is the "exactly 1 message" case, namely the single message from 
sentOnlyOne above
     val rdd3 = getRdd(kc, Set(topic))
+    // send lots of messages after rdd was defined, they shouldn't show up
     sendMessages(topic, Map("extra" -> 22))
-    // this is the "exactly 1 message" case
-    // make sure we get exactly one message, despite there being lots more 
available
+
     assert(rdd3.isDefined)
-    assert(rdd3.get.count === sent2.values.sum)
+    assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one 
message")
 
   }
 
   // get an rdd from the committed consumer offsets until the latest leader 
offsets,
   private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
     val groupId = kc.kafkaParams("group.id")
-    for {
-      topicPartitions <- kc.getPartitions(topics).right.toOption
-      from <- kc.getConsumerOffsets(groupId, 
topicPartitions).right.toOption.orElse(
+    def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
+      kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
         kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs 
=>
           offs.map(kv => kv._1 -> kv._2.offset)
         }
       )
-      until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
-    } yield {
-      KafkaRDD[String, String, StringDecoder, StringDecoder, String](
-        sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} 
${mmd.message}")
+    }
+    kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
+      consumerOffsets(topicPartitions).flatMap { from =>
+        kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until 
=>
+          val offsetRanges = from.map { case (tp: TopicAndPartition, 
fromOffset: Long) =>
+              OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
+          }.toArray
+
+          val leaders = until.map { case (tp: TopicAndPartition, lo: 
KafkaCluster.LeaderOffset) =>
+              tp -> Broker(lo.host, lo.port)
+          }.toMap
+
+          KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, 
String](
+            sc, kc.kafkaParams, offsetRanges, leaders,
+            (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} 
${mmd.message}")
+        }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to