http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
deleted file mode 100644
index 39abe3c..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
-
-import scala.collection.{mutable, Map}
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.common.TopicAndPartition
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, 
ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
-import org.apache.spark.util.ThreadUtils
-
-/**
- * ReliableKafkaReceiver offers the ability to reliably store data into 
BlockManager without loss.
- * It is turned off by default and will be enabled when
- * spark.streaming.receiver.writeAheadLog.enable is true. The difference 
compared to KafkaReceiver
- * is that this receiver manages topic-partition/offset itself and updates the 
offset information
- * after data is reliably stored as write-ahead log. Offsets will only be 
updated when data is
- * reliably stored, so the potential data loss problem of KafkaReceiver can be 
eliminated.
- *
- * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn 
off automatic offset
- * commit mechanism in Kafka consumer. So setting this configuration manually 
within kafkaParams
- * will not take effect.
- */
-private[streaming]
-class ReliableKafkaReceiver[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel)
-    extends Receiver[(K, V)](storageLevel) with Logging {
-
-  private val groupId = kafkaParams("group.id")
-  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
-  private def conf = SparkEnv.get.conf
-
-  /** High level consumer to connect to Kafka. */
-  private var consumerConnector: ConsumerConnector = null
-
-  /** zkClient to connect to Zookeeper to commit the offsets. */
-  private var zkClient: ZkClient = null
-
-  /**
-   * A HashMap to manage the offset for each topic/partition, this HashMap is 
called in
-   * synchronized block, so mutable HashMap will not meet concurrency issue.
-   */
-  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, 
Long] = null
-
-  /** A concurrent HashMap to store the stream block id and related offset 
snapshot. */
-  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]] = null
-
-  /**
-   * Manage the BlockGenerator in receiver itself for better managing block 
store and offset
-   * commit.
-   */
-  private var blockGenerator: BlockGenerator = null
-
-  /** Thread pool running the handlers for receiving message from multiple 
topics and partitions. */
-  private var messageHandlerThreadPool: ThreadPoolExecutor = null
-
-  override def onStart(): Unit = {
-    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
-
-    // Initialize the topic-partition / offset hash map.
-    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
-
-    // Initialize the stream block id / offset snapshot hash map.
-    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]]()
-
-    // Initialize the block generator for storing Kafka message.
-    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
-
-    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && 
kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
-      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in 
ReliableKafkaReceiver, " +
-        "otherwise we will manually set it to false to turn off auto offset 
commit in Kafka")
-    }
-
-    val props = new Properties()
-    kafkaParams.foreach(param => props.put(param._1, param._2))
-    // Manually set "auto.commit.enable" to "false" no matter user explicitly 
set it to true,
-    // we have to make sure this property is set to false to turn off auto 
commit mechanism in
-    // Kafka.
-    props.setProperty(AUTO_OFFSET_COMMIT, "false")
-
-    val consumerConfig = new ConsumerConfig(props)
-
-    assert(!consumerConfig.autoCommitEnable)
-
-    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
-    consumerConnector = Consumer.create(consumerConfig)
-    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
-
-    zkClient = new ZkClient(consumerConfig.zkConnect, 
consumerConfig.zkSessionTimeoutMs,
-      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
-
-    messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
-      topics.values.sum, "KafkaMessageHandler")
-
-    blockGenerator.start()
-
-    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[K]]
-
-    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[V]]
-
-    val topicMessageStreams = consumerConnector.createMessageStreams(
-      topics, keyDecoder, valueDecoder)
-
-    topicMessageStreams.values.foreach { streams =>
-      streams.foreach { stream =>
-        messageHandlerThreadPool.submit(new MessageHandler(stream))
-      }
-    }
-  }
-
-  override def onStop(): Unit = {
-    if (messageHandlerThreadPool != null) {
-      messageHandlerThreadPool.shutdown()
-      messageHandlerThreadPool = null
-    }
-
-    if (consumerConnector != null) {
-      consumerConnector.shutdown()
-      consumerConnector = null
-    }
-
-    if (zkClient != null) {
-      zkClient.close()
-      zkClient = null
-    }
-
-    if (blockGenerator != null) {
-      blockGenerator.stop()
-      blockGenerator = null
-    }
-
-    if (topicPartitionOffsetMap != null) {
-      topicPartitionOffsetMap.clear()
-      topicPartitionOffsetMap = null
-    }
-
-    if (blockOffsetMap != null) {
-      blockOffsetMap.clear()
-      blockOffsetMap = null
-    }
-  }
-
-  /** Store a Kafka message and the associated metadata as a tuple. */
-  private def storeMessageAndMetadata(
-      msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
-    val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, 
msgAndMetadata.partition)
-    val data = (msgAndMetadata.key, msgAndMetadata.message)
-    val metadata = (topicAndPartition, msgAndMetadata.offset)
-    blockGenerator.addDataWithCallback(data, metadata)
-  }
-
-  /** Update stored offset */
-  private def updateOffset(topicAndPartition: TopicAndPartition, offset: 
Long): Unit = {
-    topicPartitionOffsetMap.put(topicAndPartition, offset)
-  }
-
-  /**
-   * Remember the current offsets for each topic and partition. This is called 
when a block is
-   * generated.
-   */
-  private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
-    // Get a snapshot of current offset map and store with related block id.
-    val offsetSnapshot = topicPartitionOffsetMap.toMap
-    blockOffsetMap.put(blockId, offsetSnapshot)
-    topicPartitionOffsetMap.clear()
-  }
-
-  /**
-   * Store the ready-to-be-stored block and commit the related offsets to 
zookeeper. This method
-   * will try a fixed number of times to push the block. If the push fails, 
the receiver is stopped.
-   */
-  private def storeBlockAndCommitOffset(
-      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
-    var count = 0
-    var pushed = false
-    var exception: Exception = null
-    while (!pushed && count <= 3) {
-      try {
-        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
-        pushed = true
-      } catch {
-        case ex: Exception =>
-          count += 1
-          exception = ex
-      }
-    }
-    if (pushed) {
-      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
-      blockOffsetMap.remove(blockId)
-    } else {
-      stop("Error while storing block into Spark", exception)
-    }
-  }
-
-  /**
-   * Commit the offset of Kafka's topic/partition, the commit mechanism follow 
Kafka 0.8.x's
-   * metadata schema in Zookeeper.
-   */
-  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
-    if (zkClient == null) {
-      val thrown = new IllegalStateException("Zookeeper client is unexpectedly 
null")
-      stop("Zookeeper client is not initialized before commit offsets to ZK", 
thrown)
-      return
-    }
-
-    for ((topicAndPart, offset) <- offsetMap) {
-      try {
-        val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
-        val zkPath = 
s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
-
-        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
-      } catch {
-        case e: Exception =>
-          logWarning(s"Exception during commit offset $offset for topic" +
-            s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
-      }
-
-      logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
-        s"partition ${topicAndPart.partition}")
-    }
-  }
-
-  /** Class to handle received Kafka message. */
-  private final class MessageHandler(stream: KafkaStream[K, V]) extends 
Runnable {
-    override def run(): Unit = {
-      while (!isStopped) {
-        try {
-          val streamIterator = stream.iterator()
-          while (streamIterator.hasNext) {
-            storeMessageAndMetadata(streamIterator.next)
-          }
-        } catch {
-          case e: Exception =>
-            reportError("Error handling message", e)
-        }
-      }
-    }
-  }
-
-  /** Class to handle blocks generated by the block generator. */
-  private final class GeneratedBlockHandler extends BlockGeneratorListener {
-
-    def onAddData(data: Any, metadata: Any): Unit = {
-      // Update the offset of the data that was added to the generator
-      if (metadata != null) {
-        val (topicAndPartition, offset) = 
metadata.asInstanceOf[(TopicAndPartition, Long)]
-        updateOffset(topicAndPartition, offset)
-      }
-    }
-
-    def onGenerateBlock(blockId: StreamBlockId): Unit = {
-      // Remember the offsets of topics/partitions when a block has been 
generated
-      rememberBlockOffsets(blockId)
-    }
-
-    def onPushBlock(blockId: StreamBlockId, arrayBuffer: 
mutable.ArrayBuffer[_]): Unit = {
-      // Store block and commit the blocks offset
-      storeBlockAndCommitOffset(blockId, arrayBuffer)
-    }
-
-    def onError(message: String, throwable: Throwable): Unit = {
-      reportError(message, throwable)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
deleted file mode 100644
index 2e5ab0f..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Kafka receiver for spark streaming.
- */
-package org.apache.spark.streaming.kafka;

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
deleted file mode 100644
index 47c5187..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Kafka receiver for spark streaming,
- */
-package object kafka

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
deleted file mode 100644
index 71404a7..0000000
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaDirectKafkaStreamSuite implements Serializable {
-  private transient JavaStreamingContext ssc = null;
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
-  }
-
-  @After
-  public void tearDown() {
-    if (ssc != null) {
-      ssc.stop();
-      ssc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaStream() throws InterruptedException {
-    final String topic1 = "topic1";
-    final String topic2 = "topic2";
-    // hold a reference to the current offset ranges, so it can be used 
downstream
-    final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
-
-    String[] topic1data = createTopicAndSendData(topic1);
-    String[] topic2data = createTopicAndSendData(topic2);
-
-    Set<String> sent = new HashSet<>();
-    sent.addAll(Arrays.asList(topic1data));
-    sent.addAll(Arrays.asList(topic2data));
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
-    kafkaParams.put("auto.offset.reset", "smallest");
-
-    JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
-        ssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        topicToSet(topic1)
-    ).transformToPair(
-        // Make sure you can get offset ranges from the rdd
-        new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
-          @Override
-          public JavaPairRDD<String, String> call(JavaPairRDD<String, String> 
rdd) {
-            OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
-            offsetRanges.set(offsets);
-            Assert.assertEquals(topic1, offsets[0].topic());
-            return rdd;
-          }
-        }
-    ).map(
-        new Function<Tuple2<String, String>, String>() {
-          @Override
-          public String call(Tuple2<String, String> kv) {
-            return kv._2();
-          }
-        }
-    );
-
-    JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
-        ssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        topicOffsetToMap(topic2, 0L),
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-    JavaDStream<String> unifiedStream = stream1.union(stream2);
-
-    final Set<String> result = Collections.synchronizedSet(new 
HashSet<String>());
-    unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
-          @Override
-          public void call(JavaRDD<String> rdd) {
-            result.addAll(rdd.collect());
-          }
-        }
-    );
-    ssc.start();
-    long startTime = System.currentTimeMillis();
-    boolean matches = false;
-    while (!matches && System.currentTimeMillis() - startTime < 20000) {
-      matches = sent.size() == result.size();
-      Thread.sleep(50);
-    }
-    Assert.assertEquals(sent, result);
-    ssc.stop();
-  }
-
-  private static Set<String> topicToSet(String topic) {
-    Set<String> topicSet = new HashSet<>();
-    topicSet.add(topic);
-    return topicSet;
-  }
-
-  private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, 
Long offsetToStart) {
-    Map<TopicAndPartition, Long> topicMap = new HashMap<>();
-    topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
-    return topicMap;
-  }
-
-  private  String[] createTopicAndSendData(String topic) {
-    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, data);
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
deleted file mode 100644
index c41b629..0000000
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-public class JavaKafkaRDDSuite implements Serializable {
-  private transient JavaSparkContext sc = null;
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    sc = new JavaSparkContext(sparkConf);
-  }
-
-  @After
-  public void tearDown() {
-    if (sc != null) {
-      sc.stop();
-      sc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaRDD() throws InterruptedException {
-    String topic1 = "topic1";
-    String topic2 = "topic2";
-
-    createTopicAndSendData(topic1);
-    createTopicAndSendData(topic2);
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
-
-    OffsetRange[] offsetRanges = {
-      OffsetRange.create(topic1, 0, 0, 1),
-      OffsetRange.create(topic2, 0, 0, 1)
-    };
-
-    Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
-    Map<TopicAndPartition, Broker> leaders = new HashMap<>();
-    String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
-    Broker broker = Broker.create(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
-    leaders.put(new TopicAndPartition(topic1, 0), broker);
-    leaders.put(new TopicAndPartition(topic2, 0), broker);
-
-    JavaRDD<String> rdd1 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        offsetRanges
-    ).map(
-        new Function<Tuple2<String, String>, String>() {
-          @Override
-          public String call(Tuple2<String, String> kv) {
-            return kv._2();
-          }
-        }
-    );
-
-    JavaRDD<String> rdd2 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        offsetRanges,
-        emptyLeaders,
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-
-    JavaRDD<String> rdd3 = KafkaUtils.createRDD(
-        sc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        String.class,
-        kafkaParams,
-        offsetRanges,
-        leaders,
-        new Function<MessageAndMetadata<String, String>, String>() {
-          @Override
-          public String call(MessageAndMetadata<String, String> msgAndMd) {
-            return msgAndMd.message();
-          }
-        }
-    );
-
-    // just making sure the java user apis work; the scala tests handle logic 
corner cases
-    long count1 = rdd1.count();
-    long count2 = rdd2.count();
-    long count3 = rdd3.count();
-    Assert.assertTrue(count1 > 0);
-    Assert.assertEquals(count1, count2);
-    Assert.assertEquals(count1, count3);
-  }
-
-  private  String[] createTopicAndSendData(String topic) {
-    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, data);
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
deleted file mode 100644
index 98fe38e..0000000
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaKafkaStreamSuite implements Serializable {
-  private transient JavaStreamingContext ssc = null;
-  private transient Random random = new Random();
-  private transient KafkaTestUtils kafkaTestUtils = null;
-
-  @Before
-  public void setUp() {
-    kafkaTestUtils = new KafkaTestUtils();
-    kafkaTestUtils.setup();
-    SparkConf sparkConf = new SparkConf()
-      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
-    ssc = new JavaStreamingContext(sparkConf, new Duration(500));
-  }
-
-  @After
-  public void tearDown() {
-    if (ssc != null) {
-      ssc.stop();
-      ssc = null;
-    }
-
-    if (kafkaTestUtils != null) {
-      kafkaTestUtils.teardown();
-      kafkaTestUtils = null;
-    }
-  }
-
-  @Test
-  public void testKafkaStream() throws InterruptedException {
-    String topic = "topic1";
-    Map<String, Integer> topics = new HashMap<>();
-    topics.put(topic, 1);
-
-    Map<String, Integer> sent = new HashMap<>();
-    sent.put("a", 5);
-    sent.put("b", 3);
-    sent.put("c", 10);
-
-    kafkaTestUtils.createTopic(topic, 1);
-    kafkaTestUtils.sendMessages(topic, sent);
-
-    Map<String, String> kafkaParams = new HashMap<>();
-    kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
-    kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
-    kafkaParams.put("auto.offset.reset", "smallest");
-
-    JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
-      String.class,
-      String.class,
-      StringDecoder.class,
-      StringDecoder.class,
-      kafkaParams,
-      topics,
-      StorageLevel.MEMORY_ONLY_SER());
-
-    final Map<String, Long> result = Collections.synchronizedMap(new 
HashMap<String, Long>());
-
-    JavaDStream<String> words = stream.map(
-      new Function<Tuple2<String, String>, String>() {
-        @Override
-        public String call(Tuple2<String, String> tuple2) {
-          return tuple2._2();
-        }
-      }
-    );
-
-    words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, 
Long>>() {
-        @Override
-        public void call(JavaPairRDD<String, Long> rdd) {
-          List<Tuple2<String, Long>> ret = rdd.collect();
-          for (Tuple2<String, Long> r : ret) {
-            if (result.containsKey(r._1())) {
-              result.put(r._1(), result.get(r._1()) + r._2());
-            } else {
-              result.put(r._1(), r._2());
-            }
-          }
-        }
-      }
-    );
-
-    ssc.start();
-
-    long startTime = System.currentTimeMillis();
-    AssertionError lastError = null;
-    while (System.currentTimeMillis() - startTime < 20000) {
-      try {
-        Assert.assertEquals(sent.size(), result.size());
-        for (Map.Entry<String, Integer> e : sent.entrySet()) {
-          Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
-        }
-        return;
-      } catch (AssertionError e) {
-        lastError = e;
-      }
-      Thread.sleep(200);
-    }
-    if (lastError != null) {
-      throw lastError;
-    } else {
-      Assert.fail("timeout");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/resources/log4j.properties 
b/external/kafka-0-8/src/test/resources/log4j.properties
deleted file mode 100644
index fd51f8f..0000000
--- a/external/kafka-0-8/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN
-

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
deleted file mode 100644
index 3fd37f4..0000000
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-import java.util.{ Arrays, UUID }
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-import org.apache.spark.util.Utils
-
-class DirectKafkaStreamSuite
-  extends SparkFunSuite
-  with BeforeAndAfter
-  with BeforeAndAfterAll
-  with Eventually
-  with Logging {
-  val sparkConf = new SparkConf()
-    .setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-
-  private var ssc: StreamingContext = _
-  private var testDir: File = _
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll {
-    super.beforeAll()
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll {
-    try {
-      if (kafkaTestUtils != null) {
-        kafkaTestUtils.teardown()
-        kafkaTestUtils = null
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop(stopSparkContext = true)
-    }
-    if (testDir != null) {
-      Utils.deleteRecursively(testDir)
-    }
-  }
-
-
-  test("basic stream receiving with multiple topics and smallest starting 
offset") {
-    val topics = Set("basic1", "basic2", "basic3")
-    val data = Map("a" -> 7, "b" -> 9)
-    topics.foreach { t =>
-      kafkaTestUtils.createTopic(t)
-      kafkaTestUtils.sendMessages(t, data)
-    }
-    val totalSent = data.values.sum * topics.size
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, topics)
-    }
-
-    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
-
-    // hold a reference to the current offset ranges, so it can be used 
downstream
-    var offsetRanges = Array[OffsetRange]()
-
-    stream.transform { rdd =>
-      // Get the offset ranges in the RDD
-      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-      rdd
-    }.foreachRDD { rdd =>
-      for (o <- offsetRanges) {
-        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
-      }
-      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
-      // For each partition, get size of the range in the partition,
-      // and the number of items in the partition
-        val off = offsetRanges(i)
-        val all = iter.toSeq
-        val partSize = all.size
-        val rangeSize = off.untilOffset - off.fromOffset
-        Iterator((partSize, rangeSize))
-      }.collect
-
-      // Verify whether number of elements in each partition
-      // matches with the corresponding offset range
-      collected.foreach { case (partSize, rangeSize) =>
-        assert(partSize === rangeSize, "offset ranges are wrong")
-      }
-    }
-    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): 
_*)) }
-    ssc.start()
-    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" +
-          allReceived.asScala.mkString("\n"))
-    }
-    ssc.stop()
-  }
-
-  test("receiving from largest starting offset") {
-    val topic = "largest"
-    val topicPartition = TopicAndPartition(topic, 0)
-    val data = Map("a" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "largest"
-    )
-    val kc = new KafkaCluster(kafkaParams)
-    def getLatestOffset(): Long = {
-      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
-    }
-
-    // Send some initial messages before starting context
-    kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
-      assert(getLatestOffset() > 3)
-    }
-    val offsetBeforeStart = getLatestOffset()
-
-    // Setup context and kafka stream with largest offset
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-    assert(
-      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
-        .fromOffsets(topicPartition) >= offsetBeforeStart,
-      "Start offset not from latest"
-    )
-
-    val collectedData = new ConcurrentLinkedQueue[String]()
-    stream.map { _._2 }.foreachRDD { rdd => 
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
-    ssc.start()
-    val newData = Map("b" -> 10)
-    kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      collectedData.contains("b")
-    }
-    assert(!collectedData.contains("a"))
-    ssc.stop()
-  }
-
-
-  test("creating stream by offset") {
-    val topic = "offset"
-    val topicPartition = TopicAndPartition(topic, 0)
-    val data = Map("a" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "largest"
-    )
-    val kc = new KafkaCluster(kafkaParams)
-    def getLatestOffset(): Long = {
-      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
-    }
-
-    // Send some initial messages before starting context
-    kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
-      assert(getLatestOffset() >= 10)
-    }
-    val offsetBeforeStart = getLatestOffset()
-
-    // Setup context and kafka stream with largest offset
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder, String](
-        ssc, kafkaParams, Map(topicPartition -> 11L),
-        (m: MessageAndMetadata[String, String]) => m.message())
-    }
-    assert(
-      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
-        .fromOffsets(topicPartition) >= offsetBeforeStart,
-      "Start offset not from latest"
-    )
-
-    val collectedData = new ConcurrentLinkedQueue[String]()
-    stream.foreachRDD { rdd => 
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
-    ssc.start()
-    val newData = Map("b" -> 10)
-    kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      collectedData.contains("b")
-    }
-    assert(!collectedData.contains("a"))
-    ssc.stop()
-  }
-
-  // Test to verify the offset ranges can be recovered from the checkpoints
-  test("offset recovery") {
-    val topic = "recovery"
-    kafkaTestUtils.createTopic(topic)
-    testDir = Utils.createTempDir()
-
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    // Send data to Kafka and wait for it to be received
-    def sendData(data: Seq[Int]) {
-      val strings = data.map { _.toString}
-      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-    }
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, Milliseconds(100))
-    val kafkaStream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
-    val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: 
Option[Int]) =>
-      Some(values.sum + state.getOrElse(0))
-    }
-    ssc.checkpoint(testDir.getAbsolutePath)
-
-    // This is ensure all the data is eventually receiving only once
-    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
-      rdd.collect().headOption.foreach { x =>
-        DirectKafkaStreamSuite.total.set(x._2)
-      }
-    }
-    ssc.start()
-
-    // Send some data
-    for (i <- (1 to 10).grouped(4)) {
-      sendData(i)
-    }
-
-    eventually(timeout(20 seconds), interval(50 milliseconds)) {
-      assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
-    }
-
-    ssc.stop()
-
-    // Verify that offset ranges were generated
-    // Since "offsetRangesAfterStop" will be used to compare with 
"recoveredOffsetRanges", we should
-    // collect offset ranges after stopping. Otherwise, because new RDDs keep 
being generated before
-    // stopping, we may not be able to get the latest RDDs, then 
"recoveredOffsetRanges" will
-    // contain something not in "offsetRangesAfterStop".
-    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
-    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
-    assert(
-      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
-      "starting offset not zero"
-    )
-
-    logInfo("====== RESTARTING ========")
-
-    // Recover context from checkpoints
-    ssc = new StreamingContext(testDir.getAbsolutePath)
-    val recoveredStream = 
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
-
-    // Verify offset ranges have been recovered
-    val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => 
(x._1, x._2.toSet) }
-    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
-    assert(
-      recoveredOffsetRanges.forall { or =>
-        earlierOffsetRanges.contains((or._1, or._2))
-      },
-      "Recovered ranges are not the same as the ones generated\n" +
-        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
-        s"earlierOffsetRanges: $earlierOffsetRanges"
-    )
-    // Restart context, give more data and verify the total at the end
-    // If the total is write that means each records has been received only 
once
-    ssc.start()
-    for (i <- (11 to 20).grouped(4)) {
-      sendData(i)
-    }
-
-    eventually(timeout(20 seconds), interval(50 milliseconds)) {
-      assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
-    }
-    ssc.stop()
-  }
-
-  test("Direct Kafka stream report input information") {
-    val topic = "report-test"
-    val data = Map("a" -> 7, "b" -> 9)
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, data)
-
-    val totalSent = data.values.sum
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    import DirectKafkaStreamSuite._
-    ssc = new StreamingContext(sparkConf, Milliseconds(200))
-    val collector = new InputInfoCollector
-    ssc.addStreamingListener(collector)
-
-    val stream = withClue("Error creating direct stream") {
-      KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
-        ssc, kafkaParams, Set(topic))
-    }
-
-    val allReceived = new ConcurrentLinkedQueue[(String, String)]
-
-    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): 
_*)) }
-    ssc.start()
-    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(allReceived.size === totalSent,
-        "didn't get expected number of messages, messages:\n" +
-          allReceived.asScala.mkString("\n"))
-
-      // Calculate all the record number collected in the StreamingListener.
-      assert(collector.numRecordsSubmitted.get() === totalSent)
-      assert(collector.numRecordsStarted.get() === totalSent)
-      assert(collector.numRecordsCompleted.get() === totalSent)
-    }
-    ssc.stop()
-  }
-
-  test("maxMessagesPerPartition with backpressure disabled") {
-    val topic = "maxMessagesPerPartition"
-    val kafkaStream = getDirectKafkaStream(topic, None)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 50L, 
TopicAndPartition(topic, 1) -> 50L)
-    assert(kafkaStream.maxMessagesPerPartition(input).get ==
-      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
-  }
-
-  test("maxMessagesPerPartition with no lag") {
-    val topic = "maxMessagesPerPartition"
-    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 100))
-    val kafkaStream = getDirectKafkaStream(topic, rateController)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
-    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
-  }
-
-  test("maxMessagesPerPartition respects max rate") {
-    val topic = "maxMessagesPerPartition"
-    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 1000))
-    val kafkaStream = getDirectKafkaStream(topic, rateController)
-
-    val input = Map(TopicAndPartition(topic, 0) -> 1000L, 
TopicAndPartition(topic, 1) -> 1000L)
-    assert(kafkaStream.maxMessagesPerPartition(input).get ==
-      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
-  }
-
-  test("using rate controller") {
-    val topic = "backpressure"
-    val topicPartitions = Set(TopicAndPartition(topic, 0), 
TopicAndPartition(topic, 1))
-    kafkaTestUtils.createTopic(topic, 2)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    val batchIntervalMilliseconds = 100
-    val estimator = new ConstantEstimator(100)
-    val messages = Map("foo" -> 200)
-    kafkaTestUtils.sendMessages(topic, messages)
-
-    val sparkConf = new SparkConf()
-      // Safe, even with streaming, because we're using the direct API.
-      // Using 1 core is useful to make the test more predictable.
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
-
-    val kafkaStream = withClue("Error creating direct stream") {
-      val kc = new KafkaCluster(kafkaParams)
-      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-      val m = kc.getEarliestLeaderOffsets(topicPartitions)
-        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => 
lo.offset))
-
-      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
-          ssc, kafkaParams, m, messageHandler) {
-        override protected[streaming] val rateController =
-          Some(new DirectKafkaRateController(id, estimator))
-      }
-    }
-
-    val collectedData = new ConcurrentLinkedQueue[Array[String]]()
-
-    // Used for assertion failure messages.
-    def dataToString: String =
-      collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", 
"}")
-
-    // This is to collect the raw data received from Kafka
-    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-      val data = rdd.map { _._2 }.collect()
-      collectedData.add(data)
-    }
-
-    ssc.start()
-
-    // Try different rate limits.
-    // Wait for arrays of data to appear matching the rate.
-    Seq(100, 50, 20).foreach { rate =>
-      collectedData.clear()       // Empty this buffer on each pass.
-      estimator.updateRate(rate)  // Set a new rate.
-      // Expect blocks of data equal to "rate", scaled by the interval length 
in secs.
-      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
-      eventually(timeout(5.seconds), 
interval(batchIntervalMilliseconds.milliseconds)) {
-        // Assert that rate estimator values are used to determine 
maxMessagesPerPartition.
-        // Funky "-" in message makes the complete assertion message read 
better.
-        assert(collectedData.asScala.exists(_.size == expectedSize),
-          s" - No arrays of size $expectedSize for rate $rate found in 
$dataToString")
-      }
-    }
-
-    ssc.stop()
-  }
-
-  test("use backpressure.initialRate with backpressure") {
-    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
-  }
-
-  test("backpressure.initialRate should honor maxRatePerPartition") {
-    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
-  }
-
-  private def backpressureTest(
-      maxRatePerPartition: Int,
-      initialRate: Int,
-      maxMessagesPerPartition: Int) = {
-
-    val topic = UUID.randomUUID().toString
-    val topicPartitions = Set(TopicAndPartition(topic, 0))
-    kafkaTestUtils.createTopic(topic, 1)
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    val sparkConf = new SparkConf()
-      // Safe, even with streaming, because we're using the direct API.
-      // Using 1 core is useful to make the test more predictable.
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.backpressure.enabled", "true")
-      .set("spark.streaming.backpressure.initialRate", initialRate.toString)
-      .set("spark.streaming.kafka.maxRatePerPartition", 
maxRatePerPartition.toString)
-
-    val messages = Map("foo" -> 5000)
-    kafkaTestUtils.sendMessages(topic, messages)
-
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-
-    val kafkaStream = withClue("Error creating direct stream") {
-      val kc = new KafkaCluster(kafkaParams)
-      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-      val m = kc.getEarliestLeaderOffsets(topicPartitions)
-        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => 
lo.offset))
-
-      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
-        ssc, kafkaParams, m, messageHandler)
-    }
-    kafkaStream.start()
-
-    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
-
-    assert(kafkaStream.maxMessagesPerPartition(input).get ==
-      Map(new TopicAndPartition(topic, 0) -> maxMessagesPerPartition))
-
-    kafkaStream.stop()
-  }
-
-  test("maxMessagesPerPartition with zero offset and rate equal to one") {
-    val topic = "backpressure"
-    val kafkaParams = Map(
-      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    val batchIntervalMilliseconds = 60000
-    val sparkConf = new SparkConf()
-      // Safe, even with streaming, because we're using the direct API.
-      // Using 1 core is useful to make the test more predictable.
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
-    val estimatedRate = 1L
-    val kafkaStream = withClue("Error creating direct stream") {
-      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-      val fromOffsets = Map(
-        TopicAndPartition(topic, 0) -> 0L,
-        TopicAndPartition(topic, 1) -> 0L,
-        TopicAndPartition(topic, 2) -> 0L,
-        TopicAndPartition(topic, 3) -> 0L
-      )
-      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
-        ssc, kafkaParams, fromOffsets, messageHandler) {
-        override protected[streaming] val rateController =
-          Some(new DirectKafkaRateController(id, null) {
-            override def getLatestRate() = estimatedRate
-          })
-      }
-    }
-
-    val offsets = Map(
-      TopicAndPartition(topic, 0) -> 0L,
-      TopicAndPartition(topic, 1) -> 100L,
-      TopicAndPartition(topic, 2) -> 200L,
-      TopicAndPartition(topic, 3) -> 300L
-    )
-    val result = kafkaStream.maxMessagesPerPartition(offsets)
-    val expected = Map(
-      TopicAndPartition(topic, 0) -> 1L,
-      TopicAndPartition(topic, 1) -> 10L,
-      TopicAndPartition(topic, 2) -> 20L,
-      TopicAndPartition(topic, 3) -> 30L
-    )
-    assert(result.contains(expected), s"Number of messages per partition must 
be at least 1")
-  }
-
-  /** Get the generated offset ranges from the DirectKafkaStream */
-  private def getOffsetRanges[K, V](
-      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
-    kafkaStream.generatedRDDs.mapValues { rdd =>
-      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
-    }.toSeq.sortBy { _._1 }
-  }
-
-  private def getDirectKafkaStream(topic: String, mockRateController: 
Option[RateController]) = {
-    val batchIntervalMilliseconds = 100
-
-    val sparkConf = new SparkConf()
-      .setMaster("local[1]")
-      .setAppName(this.getClass.getSimpleName)
-      .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
-    // Setup the streaming context
-    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
-
-    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
-    val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-    new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, 
(String, String)](
-      ssc, Map[String, String](), earliestOffsets, messageHandler) {
-      override protected[streaming] val rateController = mockRateController
-    }
-  }
-}
-
-object DirectKafkaStreamSuite {
-  val total = new AtomicLong(-1L)
-
-  class InputInfoCollector extends StreamingListener {
-    val numRecordsSubmitted = new AtomicLong(0L)
-    val numRecordsStarted = new AtomicLong(0L)
-    val numRecordsCompleted = new AtomicLong(0L)
-
-    override def onBatchSubmitted(batchSubmitted: 
StreamingListenerBatchSubmitted): Unit = {
-      numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
-    }
-
-    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
Unit = {
-      numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
-    }
-
-    override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
-      numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
-    }
-  }
-}
-
-private[streaming] class ConstantEstimator(@volatile private var rate: Long)
-  extends RateEstimator {
-
-  def updateRate(newRate: Long): Unit = {
-    rate = newRate
-  }
-
-  def compute(
-      time: Long,
-      elements: Long,
-      processingDelay: Long,
-      schedulingDelay: Long): Option[Double] = Some(rate)
-}
-
-private[streaming] class ConstantRateController(id: Int, estimator: 
RateEstimator, rate: Long)
-  extends RateController(id, estimator) {
-  override def publish(rate: Long): Unit = ()
-  override def getLatestRate(): Long = rate
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
deleted file mode 100644
index 73d5285..0000000
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.SparkFunSuite
-
-class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
-  private val topic = "kcsuitetopic" + Random.nextInt(10000)
-  private val topicAndPartition = TopicAndPartition(topic, 0)
-  private var kc: KafkaCluster = null
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll() {
-    super.beforeAll()
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
-    kc = new KafkaCluster(Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress))
-  }
-
-  override def afterAll() {
-    try {
-      if (kafkaTestUtils != null) {
-        kafkaTestUtils.teardown()
-        kafkaTestUtils = null
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("metadata apis") {
-    val leader = 
kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
-    val leaderAddress = s"${leader._1}:${leader._2}"
-    assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
-
-    val parts = kc.getPartitions(Set(topic)).right.get
-    assert(parts(topicAndPartition), "didn't get partitions")
-
-    val err = kc.getPartitions(Set(topic + "BAD"))
-    assert(err.isLeft, "getPartitions for a nonexistant topic should be an 
error")
-  }
-
-  test("leader offset apis") {
-    val earliest = 
kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
-    assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
-
-    val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
-    assert(latest(topicAndPartition).offset === 1, "didn't get latest")
-  }
-
-  test("consumer offset apis") {
-    val group = "kcsuitegroup" + Random.nextInt(10000)
-
-    val offset = Random.nextInt(10000)
-
-    val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
-    assert(set.isRight, "didn't set consumer offsets")
-
-    val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
-    assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
deleted file mode 100644
index 72f9541..0000000
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark._
-
-class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  private val sparkConf = new SparkConf().setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-  private var sc: SparkContext = _
-
-  override def beforeAll {
-    super.beforeAll()
-    sc = new SparkContext(sparkConf)
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll {
-    try {
-      try {
-        if (sc != null) {
-          sc.stop
-          sc = null
-        }
-      } finally {
-        if (kafkaTestUtils != null) {
-          kafkaTestUtils.teardown()
-          kafkaTestUtils = null
-        }
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("basic usage") {
-    val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
-    kafkaTestUtils.createTopic(topic)
-    val messages = Array("the", "quick", "brown", "fox")
-    kafkaTestUtils.sendMessages(topic, messages)
-
-    val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> 
s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
-
-    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
-
-    val rdd = KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
-      sc, kafkaParams, offsetRanges)
-
-    val received = rdd.map(_._2).collect.toSet
-    assert(received === messages.toSet)
-
-    // size-related method optimizations return sane results
-    assert(rdd.count === messages.size)
-    assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
-    assert(!rdd.isEmpty)
-    assert(rdd.take(1).size === 1)
-    assert(rdd.take(1).head._2 === messages.head)
-    assert(rdd.take(messages.size + 10).size === messages.size)
-
-    val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, 
StringDecoder](
-      sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
-
-    assert(emptyRdd.isEmpty)
-
-    // invalid offset ranges throw exceptions
-    val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
-    intercept[SparkException] {
-      KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
-        sc, kafkaParams, badRanges)
-    }
-  }
-
-  test("iterator boundary conditions") {
-    // the idea is to find e.g. off-by-one errors between what kafka has 
available and the rdd
-    val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
-    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
-    kafkaTestUtils.createTopic(topic)
-
-    val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> 
s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
-
-    val kc = new KafkaCluster(kafkaParams)
-
-    // this is the "lots of messages" case
-    kafkaTestUtils.sendMessages(topic, sent)
-    val sentCount = sent.values.sum
-
-    // rdd defined from leaders after sending messages, should get the number 
sent
-    val rdd = getRdd(kc, Set(topic))
-
-    assert(rdd.isDefined)
-
-    val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
-    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
-
-    assert(rangeCount === sentCount, "offset range didn't include all sent 
messages")
-    assert(rdd.get.count === sentCount, "didn't get all sent messages")
-
-    val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> 
o.untilOffset).toMap
-
-    // make sure consumer offsets are committed before the next getRdd call
-    kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
-      err => throw new Exception(err.mkString("\n")),
-      _ => ()
-    )
-
-    // this is the "0 messages" case
-    val rdd2 = getRdd(kc, Set(topic))
-    // shouldn't get anything, since message is sent after rdd was defined
-    val sentOnlyOne = Map("d" -> 1)
-
-    kafkaTestUtils.sendMessages(topic, sentOnlyOne)
-
-    assert(rdd2.isDefined)
-    assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
-
-    // this is the "exactly 1 message" case, namely the single message from 
sentOnlyOne above
-    val rdd3 = getRdd(kc, Set(topic))
-    // send lots of messages after rdd was defined, they shouldn't show up
-    kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
-
-    assert(rdd3.isDefined)
-    assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one 
message")
-
-  }
-
-  // get an rdd from the committed consumer offsets until the latest leader 
offsets,
-  private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
-    val groupId = kc.kafkaParams("group.id")
-    def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
-      kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
-        kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs 
=>
-          offs.map(kv => kv._1 -> kv._2.offset)
-        }
-      )
-    }
-    kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
-      consumerOffsets(topicPartitions).flatMap { from =>
-        kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until 
=>
-          val offsetRanges = from.map { case (tp: TopicAndPartition, 
fromOffset: Long) =>
-              OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
-          }.toArray
-
-          val leaders = until.map { case (tp: TopicAndPartition, lo: 
KafkaCluster.LeaderOffset) =>
-              tp -> Broker(lo.host, lo.port)
-          }.toMap
-
-          KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, 
String](
-            sc, kc.kafkaParams, offsetRanges, leaders,
-            (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} 
${mmd.message}")
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
deleted file mode 100644
index ed130f5..0000000
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaStreamSuite extends SparkFunSuite with Eventually with 
BeforeAndAfterAll {
-  private var ssc: StreamingContext = _
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      try {
-        if (ssc != null) {
-          ssc.stop()
-          ssc = null
-        }
-      } finally {
-        if (kafkaTestUtils != null) {
-          kafkaTestUtils.teardown()
-          kafkaTestUtils = null
-        }
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("Kafka input stream") {
-    val sparkConf = new 
SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-    val topic = "topic1"
-    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, sent)
-
-    val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
-      "auto.offset.reset" -> "smallest")
-
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
-    stream.map(_._2).countByValue().foreachRDD { r =>
-      r.collect().foreach { kv =>
-        result.synchronized {
-          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
-          result.put(kv._1, count)
-        }
-      }
-    }
-
-    ssc.start()
-
-    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
-      assert(result.synchronized { sent === result })
-    }
-    ssc.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
deleted file mode 100644
index 5da5ea4..0000000
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.util.Utils
-
-class ReliableKafkaStreamSuite extends SparkFunSuite
-    with BeforeAndAfterAll with BeforeAndAfter with Eventually {
-
-  private val sparkConf = new SparkConf()
-    .setMaster("local[4]")
-    .setAppName(this.getClass.getSimpleName)
-    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
-  private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
-
-  private var kafkaTestUtils: KafkaTestUtils = _
-
-  private var groupId: String = _
-  private var kafkaParams: Map[String, String] = _
-  private var ssc: StreamingContext = _
-  private var tempDirectory: File = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    kafkaTestUtils = new KafkaTestUtils
-    kafkaTestUtils.setup()
-
-    groupId = s"test-consumer-${Random.nextInt(10000)}"
-    kafkaParams = Map(
-      "zookeeper.connect" -> kafkaTestUtils.zkAddress,
-      "group.id" -> groupId,
-      "auto.offset.reset" -> "smallest"
-    )
-
-    tempDirectory = Utils.createTempDir()
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      Utils.deleteRecursively(tempDirectory)
-
-      if (kafkaTestUtils != null) {
-        kafkaTestUtils.teardown()
-        kafkaTestUtils = null
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  before {
-    ssc = new StreamingContext(sparkConf, Milliseconds(500))
-    ssc.checkpoint(tempDirectory.getAbsolutePath)
-  }
-
-  after {
-    if (ssc != null) {
-      ssc.stop(stopSparkContext = true)
-      ssc = null
-    }
-  }
-
-  test("Reliable Kafka input stream with single topic") {
-    val topic = "test-topic"
-    kafkaTestUtils.createTopic(topic)
-    kafkaTestUtils.sendMessages(topic, data)
-
-    // Verify whether the offset of this group/topic/partition is 0 before 
starting.
-    assert(getCommitOffset(groupId, topic, 0) === None)
-
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
-    val result = new mutable.HashMap[String, Long]()
-    stream.map { case (k, v) => v }.foreachRDD { r =>
-        val ret = r.collect()
-        ret.foreach { v =>
-          val count = result.getOrElseUpdate(v, 0) + 1
-          result.put(v, count)
-        }
-      }
-    ssc.start()
-
-    eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
-      // A basic process verification for ReliableKafkaReceiver.
-      // Verify whether received message number is equal to the sent message 
number.
-      assert(data.size === result.size)
-      // Verify whether each message is the same as the data to be verified.
-      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
-      // Verify the offset number whether it is equal to the total message 
number.
-      assert(getCommitOffset(groupId, topic, 0) === Some(29L))
-    }
-  }
-
-  test("Reliable Kafka input stream with multiple topics") {
-    val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
-    topics.foreach { case (t, _) =>
-      kafkaTestUtils.createTopic(t)
-      kafkaTestUtils.sendMessages(t, data)
-    }
-
-    // Before started, verify all the group/topic/partition offsets are 0.
-    topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 
None) }
-
-    // Consuming all the data sent to the broker which will potential commit 
the offsets internally.
-    val stream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](
-      ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
-    stream.foreachRDD(_ => Unit)
-    ssc.start()
-
-    eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
-      // Verify the offset for each group/topic to see whether they are equal 
to the expected one.
-      topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) 
=== Some(29L)) }
-    }
-  }
-
-
-  /** Getting partition offset from Zookeeper. */
-  private def getCommitOffset(groupId: String, topic: String, partition: Int): 
Option[Long] = {
-    val topicDirs = new ZKGroupTopicDirs(groupId, topic)
-    val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
-    ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, 
zkPath)._1.map(_.toLong)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7ce7c9f..58a2841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2675,14 +2675,6 @@
     </profile>
 
     <profile>
-      <id>kafka-0-8</id>
-      <modules>
-        <module>external/kafka-0-8</module>
-        <module>external/kafka-0-8-assembly</module>
-      </modules>
-    </profile>
-
-    <profile>
       <id>test-java-home</id>
       <activation>
         <property><name>env.JAVA_HOME</name></property>

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8b01b90..a0aaef2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -55,14 +55,14 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
   val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
-    streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
+    sparkGangliaLgpl, streamingKinesisAsl,
     dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) =
     Seq("kubernetes", "mesos", "yarn",
-      "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
+      "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests", "hadoop-cloud", 
"kubernetes-integration-tests").map(ProjectRef(buildLocation, _))
 
-  val assemblyProjects@Seq(networkYarn, streamingKafkaAssembly, 
streamingKafka010Assembly, streamingKinesisAslAssembly) =
-    Seq("network-yarn", "streaming-kafka-0-8-assembly", 
"streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly")
+  val assemblyProjects@Seq(networkYarn, streamingKafka010Assembly, 
streamingKinesisAslAssembly) =
+    Seq("network-yarn", "streaming-kafka-0-10-assembly", 
"streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -580,10 +580,8 @@ object Assembly {
         
.getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly := {
-      if (moduleName.value.contains("streaming-kafka-0-8-assembly")
-        || moduleName.value.contains("streaming-kafka-0-10-assembly")
+      if (moduleName.value.contains("streaming-kafka-0-10-assembly")
         || moduleName.value.contains("streaming-kinesis-asl-assembly")) {
-        // This must match the same name used in maven (see 
external/kafka-0-8-assembly/pom.xml)
         s"${moduleName.value}-${version.value}.jar"
       } else {
         
s"${moduleName.value}-${version.value}-hadoop${hadoopVersion.value}.jar"

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/docs/pyspark.streaming.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.streaming.rst 
b/python/docs/pyspark.streaming.rst
index 9c25628..f7df643 100644
--- a/python/docs/pyspark.streaming.rst
+++ b/python/docs/pyspark.streaming.rst
@@ -9,13 +9,6 @@ Module contents
     :undoc-members:
     :show-inheritance:
 
-pyspark.streaming.kafka module
-------------------------------
-.. automodule:: pyspark.streaming.kafka
-    :members:
-    :undoc-members:
-    :show-inheritance:
-
 pyspark.streaming.kinesis module
 --------------------------------
 .. automodule:: pyspark.streaming.kinesis

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index 946601e..c253e5c 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -45,7 +45,7 @@ class DStream(object):
     for more details on RDDs).
 
     DStreams can either be created from live data (such as, data from TCP
-    sockets, Kafka, etc.) using a L{StreamingContext} or it can be
+    sockets, etc.) using a L{StreamingContext} or it can be
     generated by transforming existing DStreams using operations such as
     `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
     program is running, each DStream periodically generates a RDD, either
@@ -626,7 +626,6 @@ class TransformedDStream(DStream):
 
         # Using type() to avoid folding the functions and compacting the 
DStreams which is not
         # not strictly an object of TransformedDStream.
-        # Changed here is to avoid bug in KafkaTransformedDStream when calling 
offsetRanges().
         if (type(prev) is TransformedDStream and
                 not prev.is_cached and not prev.is_checkpointed):
             prev_func = prev.func


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to