Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cc3c44b11 -> 1932bb683


[SPARK-12177][STREAMING][KAFKA] limit api surface area

## What changes were proposed in this pull request?
This is an alternative to the refactoring proposed by 
https://github.com/apache/spark/pull/13996

## How was this patch tested?

unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10

Author: cody koeninger <c...@koeninger.org>

Closes #13998 from koeninger/kafka-0-10-refactor.

(cherry picked from commit fbfd0ab9d70f557c38c7bb8e704475bf19adaf02)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1932bb68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1932bb68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1932bb68

Branch: refs/heads/branch-2.0
Commit: 1932bb683fc11735669c7a4b9e746e2a1dbbcb68
Parents: cc3c44b
Author: cody koeninger <c...@koeninger.org>
Authored: Fri Jul 1 00:53:36 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Jul 1 00:53:46 2016 -0700

----------------------------------------------------------------------
 .../streaming/kafka010/ConsumerStrategy.scala   | 187 +++++++++----------
 .../kafka010/DirectKafkaInputDStream.scala      |   2 +-
 .../spark/streaming/kafka010/KafkaRDD.scala     |   4 +-
 .../streaming/kafka010/KafkaTestUtils.scala     |   5 +-
 .../spark/streaming/kafka010/KafkaUtils.scala   |  36 ++--
 .../streaming/kafka010/LocationStrategy.scala   |  74 ++++----
 .../spark/streaming/kafka010/package.scala      |   2 +-
 .../kafka010/JavaConsumerStrategySuite.java     |  32 ++--
 .../kafka010/JavaDirectKafkaStreamSuite.java    |   8 +-
 .../streaming/kafka010/JavaKafkaRDDSuite.java   |   9 +-
 .../kafka010/JavaLocationStrategySuite.java     |  19 +-
 .../kafka010/DirectKafkaStreamSuite.scala       |  35 +++-
 .../streaming/kafka010/KafkaRDDSuite.scala      |   2 +-
 13 files changed, 222 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 079a07d..70c3f1a 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.kafka010
 
-import java.{ util => ju }
+import java.{ lang => jl, util => ju }
 
 import scala.collection.JavaConverters._
 
@@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
 /**
  * :: Experimental ::
  * Choice of how to create and configure underlying Kafka Consumers on driver 
and executors.
+ * See [[ConsumerStrategies]] to obtain instances.
  * Kafka 0.10 consumers can require additional, sometimes complex, setup after 
object
  *  instantiation. This interface encapsulates that process, and allows it to 
be checkpointed.
  * @tparam K type of Kafka message key
  * @tparam V type of Kafka message value
  */
 @Experimental
-trait ConsumerStrategy[K, V] {
+abstract class ConsumerStrategy[K, V] {
   /**
-   * Kafka <a 
href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * Kafka <a 
href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on executors. Requires 
"bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
@@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
    * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
    * checkpoint.
    */
-  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
 }
 
 /**
- * :: Experimental ::
  * Subscribe to a collection of topics.
  * @param topics collection of topics to subscribe
  * @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
  * configuration parameters</a> to be used on driver. The same params will be 
used on executors,
  * with minor automatic modifications applied.
  *  Requires "bootstrap.servers" to be set
@@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
  * TopicPartition, the committed offset (if applicable) or kafka param
  * auto.offset.reset will be used.
  */
-@Experimental
-case class Subscribe[K, V] private(
-    topics: ju.Collection[java.lang.String],
+private case class Subscribe[K, V](
+    topics: ju.Collection[jl.String],
     kafkaParams: ju.Map[String, Object],
-    offsets: ju.Map[TopicPartition, Long]
+    offsets: ju.Map[TopicPartition, jl.Long]
   ) extends ConsumerStrategy[K, V] {
 
   def executorKafkaParams: ju.Map[String, Object] = kafkaParams
 
-  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] 
= {
     val consumer = new KafkaConsumer[K, V](kafkaParams)
     consumer.subscribe(topics)
     if (currentOffsets.isEmpty) {
@@ -91,17 +90,51 @@ case class Subscribe[K, V] private(
 }
 
 /**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
+ * configuration parameters</a> to be used on driver. The same params will be 
used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+private case class Assign[K, V](
+    topicPartitions: ju.Collection[TopicPartition],
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, jl.Long]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] 
= {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.assign(topicPartitions)
+    if (currentOffsets.isEmpty) {
+      offsets.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
  * :: Experimental ::
- * Companion object for creating [[Subscribe]] strategy
+ * object for obtaining instances of [[ConsumerStrategy]]
  */
 @Experimental
-object Subscribe {
+object ConsumerStrategies {
   /**
    *  :: Experimental ::
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
@@ -111,14 +144,14 @@ object Subscribe {
    * auto.offset.reset will be used.
    */
   @Experimental
-  def apply[K, V](
-      topics: Iterable[java.lang.String],
+  def Subscribe[K, V](
+      topics: Iterable[jl.String],
       kafkaParams: collection.Map[String, Object],
-      offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
-    Subscribe[K, V](
+      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = 
{
+    new Subscribe[K, V](
       new ju.ArrayList(topics.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new 
jl.Long(l)).asJava))
   }
 
   /**
@@ -126,20 +159,20 @@ object Subscribe {
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
   @Experimental
-  def apply[K, V](
-      topics: Iterable[java.lang.String],
-      kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
-    Subscribe[K, V](
+  def Subscribe[K, V](
+      topics: Iterable[jl.String],
+      kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new Subscribe[K, V](
       new ju.ArrayList(topics.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      ju.Collections.emptyMap[TopicPartition, Long]())
+      ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
 
   /**
@@ -147,7 +180,7 @@ object Subscribe {
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
@@ -157,11 +190,11 @@ object Subscribe {
    * auto.offset.reset will be used.
    */
   @Experimental
-  def create[K, V](
-      topics: ju.Collection[java.lang.String],
+  def Subscribe[K, V](
+      topics: ju.Collection[jl.String],
       kafkaParams: ju.Map[String, Object],
-      offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
-    Subscribe[K, V](topics, kafkaParams, offsets)
+      offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+    new Subscribe[K, V](topics, kafkaParams, offsets)
   }
 
   /**
@@ -169,69 +202,25 @@ object Subscribe {
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
   @Experimental
-  def create[K, V](
-      topics: ju.Collection[java.lang.String],
-      kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
-    Subscribe[K, V](topics, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, Long]())
-  }
-
-}
-
-/**
- * :: Experimental ::
- * Assign a fixed collection of TopicPartitions
- * @param topicPartitions collection of TopicPartitions to assign
- * @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
- * configuration parameters</a> to be used on driver. The same params will be 
used on executors,
- * with minor automatic modifications applied.
- *  Requires "bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
- * TopicPartition, the committed offset (if applicable) or kafka param
- * auto.offset.reset will be used.
- */
-@Experimental
-case class Assign[K, V] private(
-    topicPartitions: ju.Collection[TopicPartition],
-    kafkaParams: ju.Map[String, Object],
-    offsets: ju.Map[TopicPartition, Long]
-  ) extends ConsumerStrategy[K, V] {
-
-  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
-
-  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
-    val consumer = new KafkaConsumer[K, V](kafkaParams)
-    consumer.assign(topicPartitions)
-    if (currentOffsets.isEmpty) {
-      offsets.asScala.foreach { case (topicPartition, offset) =>
-          consumer.seek(topicPartition, offset)
-      }
-    }
-
-    consumer
+  def Subscribe[K, V](
+      topics: ju.Collection[jl.String],
+      kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new Subscribe[K, V](topics, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
-}
 
-/**
- *  :: Experimental ::
- * Companion object for creating [[Assign]] strategy
- */
-@Experimental
-object Assign {
   /**
    *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
@@ -241,14 +230,14 @@ object Assign {
    * auto.offset.reset will be used.
    */
   @Experimental
-  def apply[K, V](
+  def Assign[K, V](
       topicPartitions: Iterable[TopicPartition],
       kafkaParams: collection.Map[String, Object],
-      offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
-    Assign[K, V](
+      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = 
{
+    new Assign[K, V](
       new ju.ArrayList(topicPartitions.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new 
jl.Long(l)).asJava))
   }
 
   /**
@@ -256,20 +245,20 @@ object Assign {
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
   @Experimental
-  def apply[K, V](
+  def Assign[K, V](
       topicPartitions: Iterable[TopicPartition],
-      kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
-    Assign[K, V](
+      kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new Assign[K, V](
       new ju.ArrayList(topicPartitions.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      ju.Collections.emptyMap[TopicPartition, Long]())
+      ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
 
   /**
@@ -277,7 +266,7 @@ object Assign {
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
@@ -287,11 +276,11 @@ object Assign {
    * auto.offset.reset will be used.
    */
   @Experimental
-  def create[K, V](
+  def Assign[K, V](
       topicPartitions: ju.Collection[TopicPartition],
       kafkaParams: ju.Map[String, Object],
-      offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
-    Assign[K, V](topicPartitions, kafkaParams, offsets)
+      offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+    new Assign[K, V](topicPartitions, kafkaParams, offsets)
   }
 
   /**
@@ -299,16 +288,20 @@ object Assign {
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
    * with minor automatic modifications applied.
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
   @Experimental
-  def create[K, V](
+  def Assign[K, V](
       topicPartitions: ju.Collection[TopicPartition],
-      kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
-    Assign[K, V](topicPartitions, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, Long]())
+      kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new Assign[K, V](
+      topicPartitions,
+      kafkaParams,
+      ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index acd1841..13827f6 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
   @transient private var kc: Consumer[K, V] = null
   def consumer(): Consumer[K, V] = this.synchronized {
     if (null == kc) {
-      kc = consumerStrategy.onStart(currentOffsets)
+      kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new 
java.lang.Long(l)).asJava)
     }
     kc
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index c15c163..5b5a9ac 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
  * Starting and ending offsets are specified in advance,
  * so that you can control exactly-once semantics.
  * @param kafkaParams Kafka
- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
  * configuration parameters</a>. Requires "bootstrap.servers" to be set
  * with Kafka broker(s) specified in host1:port1,host2:port2 form.
  * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
@@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
       " must be set to false for executor kafka params, else offsets may 
commit before processing")
 
   // TODO is it necessary to have separate configs for initial poll time vs 
ongoing poll time?
-  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
   private val cacheInitialCapacity =
     conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
   private val cacheMaxCapacity =

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 13c0843..19192e4b 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -61,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   // Kafka broker related configurations
   private val brokerHost = "localhost"
-  private var brokerPort = 9092
+  private var brokerPort = 0
   private var brokerConf: KafkaConfig = _
 
   // Kafka broker server
@@ -110,7 +110,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
       brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
       server = new KafkaServer(brokerConf)
       server.startup()
-      (server, port)
+      brokerPort = server.boundPort()
+      (server, brokerPort)
     }, new SparkConf(), "KafkaBroker")
 
     brokerReady = true

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index c052499..b2190bf 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.dstream._
 
 /**
  * :: Experimental ::
- * Companion object for constructing Kafka streams and RDDs
+ * object for constructing Kafka streams and RDDs
  */
 @Experimental
 object KafkaUtils extends Logging {
@@ -44,12 +44,12 @@ object KafkaUtils extends Logging {
    * Starting and ending offsets are specified in advance,
    * so that you can control exactly-once semantics.
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a>. Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
-   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
-   *   see [[LocationStrategy]] for more details.
+   * @param locationStrategy In most cases, pass in 
LocationStrategies.preferConsistent,
+   *   see [[LocationStrategies]] for more details.
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
@@ -83,12 +83,12 @@ object KafkaUtils extends Logging {
    * @param keyClass Class of the keys in the Kafka records
    * @param valueClass Class of the values in the Kafka records
    * @param kafkaParams Kafka
-   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
    * configuration parameters</a>. Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
-   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
-   *   see [[LocationStrategy]] for more details.
+   * @param locationStrategy In most cases, pass in 
LocationStrategies.preferConsistent,
+   *   see [[LocationStrategies]] for more details.
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
@@ -110,10 +110,10 @@ object KafkaUtils extends Logging {
    * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
    *  of messages
    * per second that each '''partition''' will accept.
-   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
-   *   see [[LocationStrategy]] for more details.
-   * @param consumerStrategy In most cases, pass in [[Subscribe]],
-   *   see [[ConsumerStrategy]] for more details
+   * @param locationStrategy In most cases, pass in 
LocationStrategies.preferConsistent,
+   *   see [[LocationStrategies]] for more details.
+   * @param consumerStrategy In most cases, pass in 
ConsumerStrategies.subscribe,
+   *   see [[ConsumerStrategies]] for more details
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
@@ -132,10 +132,10 @@ object KafkaUtils extends Logging {
    * each given Kafka topic/partition corresponds to an RDD partition.
    * @param keyClass Class of the keys in the Kafka records
    * @param valueClass Class of the values in the Kafka records
-   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
-   *   see [[LocationStrategy]] for more details.
-   * @param consumerStrategy In most cases, pass in [[Subscribe]],
-   *   see [[ConsumerStrategy]] for more details
+   * @param locationStrategy In most cases, pass in 
LocationStrategies.preferConsistent,
+   *   see [[LocationStrategies]] for more details.
+   * @param consumerStrategy In most cases, pass in 
ConsumerStrategies.subscribe,
+   *   see [[ConsumerStrategies]] for more details
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
@@ -161,7 +161,11 @@ object KafkaUtils extends Logging {
     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
 
     // driver and executor should be in different consumer groups
-    val groupId = "spark-executor-" + 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+    if (null == originalGroupId) {
+      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should 
probably set it")
+    }
+    val groupId = "spark-executor-" + originalGroupId
     logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to 
${groupId}")
     kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
index df62030..c9a8a13 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -29,49 +29,57 @@ import org.apache.spark.annotation.Experimental
 /**
  *  :: Experimental ::
  * Choice of how to schedule consumers for a given TopicPartition on an 
executor.
+ * See [[LocationStrategies]] to obtain instances.
  * Kafka 0.10 consumers prefetch messages, so it's important for performance
  * to keep cached consumers on appropriate executors, not recreate them for 
every partition.
  * Choice of location is only a preference, not an absolute; partitions may be 
scheduled elsewhere.
  */
 @Experimental
-sealed trait LocationStrategy
+sealed abstract class LocationStrategy
 
-/**
- *  :: Experimental ::
- * Use this only if your executors are on the same nodes as your Kafka brokers.
- */
-@Experimental
-case object PreferBrokers extends LocationStrategy {
-  def create: PreferBrokers.type = this
-}
+private case object PreferBrokers extends LocationStrategy
 
-/**
- *  :: Experimental ::
- * Use this in most cases, it will consistently distribute partitions across 
all executors.
- */
-@Experimental
-case object PreferConsistent extends LocationStrategy {
-  def create: PreferConsistent.type = this
-}
+private case object PreferConsistent extends LocationStrategy
 
-/**
- *  :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
- */
-@Experimental
-case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) 
extends LocationStrategy
+private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) 
extends LocationStrategy
 
 /**
- *  :: Experimental ::
- * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
- * Any TopicPartition not specified in the map will use a consistent location.
+ * :: Experimental :: object to obtain instances of [[LocationStrategy]]
+ *
  */
 @Experimental
-object PreferFixed {
-  def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
-    PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
-  }
-  def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
-    PreferFixed(hostMap)
+object LocationStrategies {
+  /**
+   *  :: Experimental ::
+   * Use this only if your executors are on the same nodes as your Kafka 
brokers.
+   */
+  @Experimental
+  def PreferBrokers: LocationStrategy =
+    org.apache.spark.streaming.kafka010.PreferBrokers
+
+  /**
+   *  :: Experimental ::
+   * Use this in most cases, it will consistently distribute partitions across 
all executors.
+   */
+  @Experimental
+  def PreferConsistent: LocationStrategy =
+    org.apache.spark.streaming.kafka010.PreferConsistent
+
+  /**
+   *  :: Experimental ::
+   * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
+   * Any TopicPartition not specified in the map will use a consistent 
location.
+   */
+  @Experimental
+  def PreferFixed(hostMap: collection.Map[TopicPartition, String]): 
LocationStrategy =
+    new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+
+  /**
+   *  :: Experimental ::
+   * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
+   * Any TopicPartition not specified in the map will use a consistent 
location.
+   */
+  @Experimental
+  def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
+    new PreferFixed(hostMap)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
index 2bfc1e8..09db6d6 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -20,4 +20,4 @@ package org.apache.spark.streaming
 /**
  * Spark Integration for Kafka 0.10
  */
-package object kafka
+package object kafka010 //scalastyle:ignore

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index 8d7c05b..ac8d64b 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -44,37 +44,39 @@ public class JavaConsumerStrategySuite implements 
Serializable {
     kafkaParams.put("bootstrap.servers", "not used");
     final scala.collection.Map<String, Object> sKafkaParams =
       JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
-    final Map<TopicPartition, Object> offsets = new HashMap<>();
+    final Map<TopicPartition, Long> offsets = new HashMap<>();
     offsets.put(tp1, 23L);
     final scala.collection.Map<TopicPartition, Object> sOffsets =
-      JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+      JavaConverters.mapAsScalaMapConverter(offsets).asScala().mapValues(
+        new scala.runtime.AbstractFunction1<Long, Object>() {
+          @Override
+          public Object apply(Long x) {
+            return (Object) x;
+          }
+        }
+      );
 
-    // make sure constructors can be called from java
-    // final ConsumerStrategy<String, String> sub0 =          // does not 
compile in Scala 2.10
-    //   Subscribe.<String, String>apply(topics, kafkaParams, offsets);
     final ConsumerStrategy<String, String> sub1 =
-      Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
+      ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams, 
sOffsets);
     final ConsumerStrategy<String, String> sub2 =
-      Subscribe.<String, String>apply(sTopics, sKafkaParams);
+      ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams);
     final ConsumerStrategy<String, String> sub3 =
-      Subscribe.<String, String>create(topics, kafkaParams, offsets);
+      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, 
offsets);
     final ConsumerStrategy<String, String> sub4 =
-      Subscribe.<String, String>create(topics, kafkaParams);
+      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams);
 
     Assert.assertEquals(
       sub1.executorKafkaParams().get("bootstrap.servers"),
       sub3.executorKafkaParams().get("bootstrap.servers"));
 
-    // final ConsumerStrategy<String, String> asn0 =          // does not 
compile in Scala 2.10
-    //   Assign.<String, String>apply(parts, kafkaParams, offsets);
     final ConsumerStrategy<String, String> asn1 =
-      Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
+      ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, 
sOffsets);
     final ConsumerStrategy<String, String> asn2 =
-      Assign.<String, String>apply(sParts, sKafkaParams);
+      ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams);
     final ConsumerStrategy<String, String> asn3 =
-      Assign.<String, String>create(parts, kafkaParams, offsets);
+      ConsumerStrategies.<String, String>Assign(parts, kafkaParams, offsets);
     final ConsumerStrategy<String, String> asn4 =
-      Assign.<String, String>create(parts, kafkaParams);
+      ConsumerStrategies.<String, String>Assign(parts, kafkaParams);
 
     Assert.assertEquals(
       asn1.executorKafkaParams().get("bootstrap.servers"),

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
index e57ede7..dc9c13b 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -90,8 +90,8 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
 
     JavaInputDStream<ConsumerRecord<String, String>> istream1 = 
KafkaUtils.createDirectStream(
         ssc,
-        PreferConsistent.create(),
-        Subscribe.<String, String>create(Arrays.asList(topic1), kafkaParams)
+        LocationStrategies.PreferConsistent(),
+        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), 
kafkaParams)
     );
 
     JavaDStream<String> stream1 = istream1.transform(
@@ -123,8 +123,8 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
 
     JavaInputDStream<ConsumerRecord<String, String>> istream2 = 
KafkaUtils.createDirectStream(
         ssc,
-        PreferConsistent.create(),
-        Subscribe.<String, String>create(Arrays.asList(topic2), kafkaParams2)
+        LocationStrategies.PreferConsistent(),
+        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic2), 
kafkaParams2)
     );
 
     JavaDStream<String> stream2 = istream2.transform(

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
index 548ba13..87bfe15 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.TopicPartition;
@@ -65,6 +66,8 @@ public class JavaKafkaRDDSuite implements Serializable {
     String topic1 = "topic1";
     String topic2 = "topic2";
 
+    Random random = new Random();
+
     createTopicAndSendData(topic1);
     createTopicAndSendData(topic2);
 
@@ -72,6 +75,8 @@ public class JavaKafkaRDDSuite implements Serializable {
     kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
     kafkaParams.put("key.deserializer", StringDeserializer.class);
     kafkaParams.put("value.deserializer", StringDeserializer.class);
+    kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
+      "-" + System.currentTimeMillis());
 
     OffsetRange[] offsetRanges = {
       OffsetRange.create(topic1, 0, 0, 1),
@@ -96,14 +101,14 @@ public class JavaKafkaRDDSuite implements Serializable {
         sc,
         kafkaParams,
         offsetRanges,
-        PreferFixed.create(leaders)
+        LocationStrategies.PreferFixed(leaders)
     ).map(handler);
 
     JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
         sc,
         kafkaParams,
         offsetRanges,
-        PreferConsistent.create()
+        LocationStrategies.PreferConsistent()
     ).map(handler);
 
     // just making sure the java user apis work; the scala tests handle logic 
corner cases

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
index 7873c09..41ccb0e 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaLocationStrategySuite.java
@@ -41,18 +41,19 @@ public class JavaLocationStrategySuite implements 
Serializable {
       JavaConverters.mapAsScalaMapConverter(hosts).asScala();
 
     // make sure constructors can be called from java
-    final LocationStrategy c1 = PreferConsistent.create();
-    final LocationStrategy c2 = PreferConsistent$.MODULE$;
-    Assert.assertEquals(c1, c2);
+    final LocationStrategy c1 = LocationStrategies.PreferConsistent();
+    final LocationStrategy c2 = LocationStrategies.PreferConsistent();
+    Assert.assertSame(c1, c2);
 
-    final LocationStrategy c3 = PreferBrokers.create();
-    final LocationStrategy c4 = PreferBrokers$.MODULE$;
-    Assert.assertEquals(c3, c4);
+    final LocationStrategy c3 = LocationStrategies.PreferBrokers();
+    final LocationStrategy c4 = LocationStrategies.PreferBrokers();
+    Assert.assertSame(c3, c4);
 
-    final LocationStrategy c5 = PreferFixed.create(hosts);
-    final LocationStrategy c6 = PreferFixed.apply(sHosts);
-    Assert.assertEquals(c5, c6);
+    Assert.assertNotSame(c1, c3);
 
+    final LocationStrategy c5 = LocationStrategies.PreferFixed(hosts);
+    final LocationStrategy c6 = LocationStrategies.PreferFixed(sHosts);
+    Assert.assertEquals(c5, c6);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 776d11a..0a53259 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.kafka010
 
 import java.io.File
+import java.lang.{ Long => JLong }
 import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
 import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.ConcurrentLinkedQueue
@@ -93,7 +94,7 @@ class DirectKafkaStreamSuite
     kp
   }
 
-  val preferredHosts = PreferConsistent
+  val preferredHosts = LocationStrategies.PreferConsistent
 
   test("basic stream receiving with multiple topics and smallest starting 
offset") {
     val topics = List("basic1", "basic2", "basic3")
@@ -108,7 +109,9 @@ class DirectKafkaStreamSuite
     ssc = new StreamingContext(sparkConf, Milliseconds(200))
     val stream = withClue("Error creating direct stream") {
       KafkaUtils.createDirectStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](topics, 
kafkaParams.asScala))
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](topics, 
kafkaParams.asScala))
     }
     val allReceived = new ConcurrentLinkedQueue[(String, String)]()
 
@@ -178,7 +181,9 @@ class DirectKafkaStreamSuite
     ssc = new StreamingContext(sparkConf, Milliseconds(200))
     val stream = withClue("Error creating direct stream") {
       val s = new DirectKafkaInputDStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](List(topic), 
kafkaParams.asScala))
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala))
       s.consumer.poll(0)
       assert(
         s.consumer.position(topicPartition) >= offsetBeforeStart,
@@ -225,8 +230,10 @@ class DirectKafkaStreamSuite
     // Setup context and kafka stream with largest offset
     ssc = new StreamingContext(sparkConf, Milliseconds(200))
     val stream = withClue("Error creating direct stream") {
-      val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts,
-        Assign[String, String](
+      val s = new DirectKafkaInputDStream[String, String](
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Assign[String, String](
           List(topicPartition),
           kafkaParams.asScala,
           Map(topicPartition -> 11L)))
@@ -267,7 +274,9 @@ class DirectKafkaStreamSuite
     ssc = new StreamingContext(sparkConf, Milliseconds(100))
     val kafkaStream = withClue("Error creating direct stream") {
       KafkaUtils.createDirectStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](List(topic), 
kafkaParams.asScala))
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala))
     }
     val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt }
     val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: 
Option[Int]) =>
@@ -360,7 +369,9 @@ class DirectKafkaStreamSuite
     ssc = new StreamingContext(sparkConf, Milliseconds(100))
     withClue("Error creating direct stream") {
       val kafkaStream = KafkaUtils.createDirectStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](List(topic), 
kafkaParams.asScala))
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala))
       kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], 
time: Time) =>
         val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
         val data = rdd.map(_.value).collect()
@@ -412,7 +423,9 @@ class DirectKafkaStreamSuite
 
     val stream = withClue("Error creating direct stream") {
       KafkaUtils.createDirectStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](List(topic), 
kafkaParams.asScala))
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala))
     }
 
     val allReceived = new ConcurrentLinkedQueue[(String, String)]
@@ -486,7 +499,9 @@ class DirectKafkaStreamSuite
 
     val kafkaStream = withClue("Error creating direct stream") {
       new DirectKafkaInputDStream[String, String](
-        ssc, preferredHosts, Subscribe[String, String](List(topic), 
kafkaParams.asScala)) {
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala)) {
         override protected[streaming] val rateController =
           Some(new DirectKafkaRateController(id, estimator))
       }.map(r => (r.key, r.value))
@@ -552,7 +567,7 @@ class DirectKafkaStreamSuite
       preferredHosts,
       new ConsumerStrategy[String, String] {
         def executorKafkaParams = ekp
-        def onStart(currentOffsets: Map[TopicPartition, Long]): 
Consumer[String, String] = {
+        def onStart(currentOffsets: JMap[TopicPartition, JLong]): 
Consumer[String, String] = {
           val consumer = new KafkaConsumer[String, String](kafkaParams)
           val tps = List(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
           consumer.assign(Arrays.asList(tps: _*))

http://git-wip-us.apache.org/repos/asf/spark/blob/1932bb68/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 3d2546d..be373af 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -62,7 +62,7 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     "group.id" -> 
s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}"
   ).asJava
 
-  private val preferredHosts = PreferConsistent
+  private val preferredHosts = LocationStrategies.PreferConsistent
 
   test("basic usage") {
     val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to