Repository: spark Updated Branches: refs/heads/branch-2.0 e3703c411 -> 56e1e2f17
http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java deleted file mode 100644 index c41b629..0000000 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import kafka.common.TopicAndPartition; -import kafka.message.MessageAndMetadata; -import kafka.serializer.StringDecoder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -public class JavaKafkaRDDSuite implements Serializable { - private transient JavaSparkContext sc = null; - private transient KafkaTestUtils kafkaTestUtils = null; - - @Before - public void setUp() { - kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setup(); - SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); - sc = new JavaSparkContext(sparkConf); - } - - @After - public void tearDown() { - if (sc != null) { - sc.stop(); - sc = null; - } - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown(); - kafkaTestUtils = null; - } - } - - @Test - public void testKafkaRDD() throws InterruptedException { - String topic1 = "topic1"; - String topic2 = "topic2"; - - createTopicAndSendData(topic1); - createTopicAndSendData(topic2); - - Map<String, String> kafkaParams = new HashMap<>(); - kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - - OffsetRange[] offsetRanges = { - OffsetRange.create(topic1, 0, 0, 1), - OffsetRange.create(topic2, 0, 0, 1) - }; - - Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>(); - Map<TopicAndPartition, Broker> leaders = new HashMap<>(); - String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); - Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); - leaders.put(new TopicAndPartition(topic1, 0), broker); - leaders.put(new TopicAndPartition(topic2, 0), broker); - - JavaRDD<String> rdd1 = KafkaUtils.createRDD( - sc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - offsetRanges - ).map( - new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> kv) { - return kv._2(); - } - } - ); - - JavaRDD<String> rdd2 = KafkaUtils.createRDD( - sc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - String.class, - kafkaParams, - offsetRanges, - emptyLeaders, - new Function<MessageAndMetadata<String, String>, String>() { - @Override - public String call(MessageAndMetadata<String, String> msgAndMd) { - return msgAndMd.message(); - } - } - ); - - JavaRDD<String> rdd3 = KafkaUtils.createRDD( - sc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - String.class, - kafkaParams, - offsetRanges, - leaders, - new Function<MessageAndMetadata<String, String>, String>() { - @Override - public String call(MessageAndMetadata<String, String> msgAndMd) { - return msgAndMd.message(); - } - } - ); - - // just making sure the java user apis work; the scala tests handle logic corner cases - long count1 = rdd1.count(); - long count2 = rdd2.count(); - long count3 = rdd3.count(); - Assert.assertTrue(count1 > 0); - Assert.assertEquals(count1, count2); - Assert.assertEquals(count1, count3); - } - - private String[] createTopicAndSendData(String topic) { - String[] data = { topic + "-1", topic + "-2", topic + "-3"}; - kafkaTestUtils.createTopic(topic, 1); - kafkaTestUtils.sendMessages(topic, data); - return data; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java deleted file mode 100644 index 868df64..0000000 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka; - -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import kafka.serializer.StringDecoder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -public class JavaKafkaStreamSuite implements Serializable { - private transient JavaStreamingContext ssc = null; - private transient Random random = new Random(); - private transient KafkaTestUtils kafkaTestUtils = null; - - @Before - public void setUp() { - kafkaTestUtils = new KafkaTestUtils(); - kafkaTestUtils.setup(); - SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); - ssc = new JavaStreamingContext(sparkConf, new Duration(500)); - } - - @After - public void tearDown() { - if (ssc != null) { - ssc.stop(); - ssc = null; - } - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown(); - kafkaTestUtils = null; - } - } - - @Test - public void testKafkaStream() throws InterruptedException { - String topic = "topic1"; - Map<String, Integer> topics = new HashMap<>(); - topics.put(topic, 1); - - Map<String, Integer> sent = new HashMap<>(); - sent.put("a", 5); - sent.put("b", 3); - sent.put("c", 10); - - kafkaTestUtils.createTopic(topic, 1); - kafkaTestUtils.sendMessages(topic, sent); - - Map<String, String> kafkaParams = new HashMap<>(); - kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress()); - kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); - kafkaParams.put("auto.offset.reset", "smallest"); - - JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc, - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY_SER()); - - final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>()); - - JavaDStream<String> words = stream.map( - new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - } - ); - - words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() { - @Override - public void call(JavaPairRDD<String, Long> rdd) { - List<Tuple2<String, Long>> ret = rdd.collect(); - for (Tuple2<String, Long> r : ret) { - if (result.containsKey(r._1())) { - result.put(r._1(), result.get(r._1()) + r._2()); - } else { - result.put(r._1(), r._2()); - } - } - } - } - ); - - ssc.start(); - - long startTime = System.currentTimeMillis(); - boolean sizeMatches = false; - while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) { - sizeMatches = sent.size() == result.size(); - Thread.sleep(200); - } - Assert.assertEquals(sent.size(), result.size()); - for (Map.Entry<String, Integer> e : sent.entrySet()) { - Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue()); - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties deleted file mode 100644 index fd51f8f..0000000 --- a/external/kafka/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark_project.jetty=WARN - http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala deleted file mode 100644 index cb782d2..0000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ /dev/null @@ -1,531 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.io.File -import java.util.Arrays -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata -import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset -import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.util.Utils - -class DirectKafkaStreamSuite - extends SparkFunSuite - with BeforeAndAfter - with BeforeAndAfterAll - with Eventually - with Logging { - val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - - private var sc: SparkContext = _ - private var ssc: StreamingContext = _ - private var testDir: File = _ - - private var kafkaTestUtils: KafkaTestUtils = _ - - override def beforeAll { - kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setup() - } - - override def afterAll { - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown() - kafkaTestUtils = null - } - } - - after { - if (ssc != null) { - ssc.stop() - sc = null - } - if (sc != null) { - sc.stop() - } - if (testDir != null) { - Utils.deleteRecursively(testDir) - } - } - - - test("basic stream receiving with multiple topics and smallest starting offset") { - val topics = Set("basic1", "basic2", "basic3") - val data = Map("a" -> 7, "b" -> 9) - topics.foreach { t => - kafkaTestUtils.createTopic(t) - kafkaTestUtils.sendMessages(t, data) - } - val totalSent = data.values.sum * topics.size - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) - - ssc = new StreamingContext(sparkConf, Milliseconds(200)) - val stream = withClue("Error creating direct stream") { - KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics) - } - - val allReceived = new ConcurrentLinkedQueue[(String, String)]() - - // hold a reference to the current offset ranges, so it can be used downstream - var offsetRanges = Array[OffsetRange]() - - stream.transform { rdd => - // Get the offset ranges in the RDD - offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - rdd - }.foreachRDD { rdd => - for (o <- offsetRanges) { - logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") - } - val collected = rdd.mapPartitionsWithIndex { (i, iter) => - // For each partition, get size of the range in the partition, - // and the number of items in the partition - val off = offsetRanges(i) - val all = iter.toSeq - val partSize = all.size - val rangeSize = off.untilOffset - off.fromOffset - Iterator((partSize, rangeSize)) - }.collect - - // Verify whether number of elements in each partition - // matches with the corresponding offset range - collected.foreach { case (partSize, rangeSize) => - assert(partSize === rangeSize, "offset ranges are wrong") - } - } - stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } - ssc.start() - eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + - allReceived.asScala.mkString("\n")) - } - ssc.stop() - } - - test("receiving from largest starting offset") { - val topic = "largest" - val topicPartition = TopicAndPartition(topic, 0) - val data = Map("a" -> 10) - kafkaTestUtils.createTopic(topic) - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "largest" - ) - val kc = new KafkaCluster(kafkaParams) - def getLatestOffset(): Long = { - kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset - } - - // Send some initial messages before starting context - kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { - assert(getLatestOffset() > 3) - } - val offsetBeforeStart = getLatestOffset() - - // Setup context and kafka stream with largest offset - ssc = new StreamingContext(sparkConf, Milliseconds(200)) - val stream = withClue("Error creating direct stream") { - KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Set(topic)) - } - assert( - stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] - .fromOffsets(topicPartition) >= offsetBeforeStart, - "Start offset not from latest" - ) - - val collectedData = new ConcurrentLinkedQueue[String]() - stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } - ssc.start() - val newData = Map("b" -> 10) - kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { - collectedData.contains("b") - } - assert(!collectedData.contains("a")) - } - - - test("creating stream by offset") { - val topic = "offset" - val topicPartition = TopicAndPartition(topic, 0) - val data = Map("a" -> 10) - kafkaTestUtils.createTopic(topic) - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "largest" - ) - val kc = new KafkaCluster(kafkaParams) - def getLatestOffset(): Long = { - kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset - } - - // Send some initial messages before starting context - kafkaTestUtils.sendMessages(topic, data) - eventually(timeout(10 seconds), interval(20 milliseconds)) { - assert(getLatestOffset() >= 10) - } - val offsetBeforeStart = getLatestOffset() - - // Setup context and kafka stream with largest offset - ssc = new StreamingContext(sparkConf, Milliseconds(200)) - val stream = withClue("Error creating direct stream") { - KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( - ssc, kafkaParams, Map(topicPartition -> 11L), - (m: MessageAndMetadata[String, String]) => m.message()) - } - assert( - stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]] - .fromOffsets(topicPartition) >= offsetBeforeStart, - "Start offset not from latest" - ) - - val collectedData = new ConcurrentLinkedQueue[String]() - stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) } - ssc.start() - val newData = Map("b" -> 10) - kafkaTestUtils.sendMessages(topic, newData) - eventually(timeout(10 seconds), interval(50 milliseconds)) { - collectedData.contains("b") - } - assert(!collectedData.contains("a")) - } - - // Test to verify the offset ranges can be recovered from the checkpoints - test("offset recovery") { - val topic = "recovery" - kafkaTestUtils.createTopic(topic) - testDir = Utils.createTempDir() - - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) - - // Send data to Kafka and wait for it to be received - def sendDataAndWaitForReceive(data: Seq[Int]) { - val strings = data.map { _.toString} - kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { - assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) - } - } - - // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(100)) - val kafkaStream = withClue("Error creating direct stream") { - KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Set(topic)) - } - val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt } - val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) => - Some(values.sum + state.getOrElse(0)) - } - ssc.checkpoint(testDir.getAbsolutePath) - - // This is to collect the raw data received from Kafka - kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => - val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) - } - - // This is ensure all the data is eventually receiving only once - stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => - rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } - } - ssc.start() - - // Send some data and wait for them to be received - for (i <- (1 to 10).grouped(4)) { - sendDataAndWaitForReceive(i) - } - - ssc.stop() - - // Verify that offset ranges were generated - // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should - // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before - // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will - // contain something not in "offsetRangesAfterStop". - val offsetRangesAfterStop = getOffsetRanges(kafkaStream) - assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated") - assert( - offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 }, - "starting offset not zero" - ) - - logInfo("====== RESTARTING ========") - - // Recover context from checkpoints - ssc = new StreamingContext(testDir.getAbsolutePath) - val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] - - // Verify offset ranges have been recovered - val recoveredOffsetRanges = getOffsetRanges(recoveredStream) - assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") - val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } - assert( - recoveredOffsetRanges.forall { or => - earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) - }, - "Recovered ranges are not the same as the ones generated\n" + - s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + - s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" - ) - // Restart context, give more data and verify the total at the end - // If the total is write that means each records has been received only once - ssc.start() - sendDataAndWaitForReceive(11 to 20) - eventually(timeout(10 seconds), interval(50 milliseconds)) { - assert(DirectKafkaStreamSuite.total === (1 to 20).sum) - } - ssc.stop() - } - - test("Direct Kafka stream report input information") { - val topic = "report-test" - val data = Map("a" -> 7, "b" -> 9) - kafkaTestUtils.createTopic(topic) - kafkaTestUtils.sendMessages(topic, data) - - val totalSent = data.values.sum - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) - - import DirectKafkaStreamSuite._ - ssc = new StreamingContext(sparkConf, Milliseconds(200)) - val collector = new InputInfoCollector - ssc.addStreamingListener(collector) - - val stream = withClue("Error creating direct stream") { - KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Set(topic)) - } - - val allReceived = new ConcurrentLinkedQueue[(String, String)] - - stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) } - ssc.start() - eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { - assert(allReceived.size === totalSent, - "didn't get expected number of messages, messages:\n" + - allReceived.asScala.mkString("\n")) - - // Calculate all the record number collected in the StreamingListener. - assert(collector.numRecordsSubmitted.get() === totalSent) - assert(collector.numRecordsStarted.get() === totalSent) - assert(collector.numRecordsCompleted.get() === totalSent) - } - ssc.stop() - } - - test("maxMessagesPerPartition with backpressure disabled") { - val topic = "maxMessagesPerPartition" - val kafkaStream = getDirectKafkaStream(topic, None) - - val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L) - assert(kafkaStream.maxMessagesPerPartition(input).get == - Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) - } - - test("maxMessagesPerPartition with no lag") { - val topic = "maxMessagesPerPartition" - val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100)) - val kafkaStream = getDirectKafkaStream(topic, rateController) - - val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L) - assert(kafkaStream.maxMessagesPerPartition(input).isEmpty) - } - - test("maxMessagesPerPartition respects max rate") { - val topic = "maxMessagesPerPartition" - val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000)) - val kafkaStream = getDirectKafkaStream(topic, rateController) - - val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L) - assert(kafkaStream.maxMessagesPerPartition(input).get == - Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) - } - - test("using rate controller") { - val topic = "backpressure" - val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1)) - kafkaTestUtils.createTopic(topic, 2) - val kafkaParams = Map( - "metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "auto.offset.reset" -> "smallest" - ) - - val batchIntervalMilliseconds = 100 - val estimator = new ConstantEstimator(100) - val messages = Map("foo" -> 200) - kafkaTestUtils.sendMessages(topic, messages) - - val sparkConf = new SparkConf() - // Safe, even with streaming, because we're using the direct API. - // Using 1 core is useful to make the test more predictable. - .setMaster("local[1]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.kafka.maxRatePerPartition", "100") - - // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) - - val kafkaStream = withClue("Error creating direct stream") { - val kc = new KafkaCluster(kafkaParams) - val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) - val m = kc.getEarliestLeaderOffsets(topicPartitions) - .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) - - new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, m, messageHandler) { - override protected[streaming] val rateController = - Some(new DirectKafkaRateController(id, estimator)) - } - } - - val collectedData = new ConcurrentLinkedQueue[Array[String]]() - - // Used for assertion failure messages. - def dataToString: String = - collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}") - - // This is to collect the raw data received from Kafka - kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => - val data = rdd.map { _._2 }.collect() - collectedData.add(data) - } - - ssc.start() - - // Try different rate limits. - // Wait for arrays of data to appear matching the rate. - Seq(100, 50, 20).foreach { rate => - collectedData.clear() // Empty this buffer on each pass. - estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { - // Assert that rate estimator values are used to determine maxMessagesPerPartition. - // Funky "-" in message makes the complete assertion message read better. - assert(collectedData.asScala.exists(_.size == expectedSize), - s" - No arrays of size $expectedSize for rate $rate found in $dataToString") - } - } - - ssc.stop() - } - - /** Get the generated offset ranges from the DirectKafkaStream */ - private def getOffsetRanges[K, V]( - kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { - kafkaStream.generatedRDDs.mapValues { rdd => - rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges - }.toSeq.sortBy { _._1 } - } - - private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = { - val batchIntervalMilliseconds = 100 - - val sparkConf = new SparkConf() - .setMaster("local[1]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.kafka.maxRatePerPartition", "100") - - // Setup the streaming context - ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds)) - - val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L) - val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) - new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, Map[String, String](), earliestOffsets, messageHandler) { - override protected[streaming] val rateController = mockRateController - } - } -} - -object DirectKafkaStreamSuite { - val collectedData = new ConcurrentLinkedQueue[String]() - @volatile var total = -1L - - class InputInfoCollector extends StreamingListener { - val numRecordsSubmitted = new AtomicLong(0L) - val numRecordsStarted = new AtomicLong(0L) - val numRecordsCompleted = new AtomicLong(0L) - - override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { - numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords) - } - - override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { - numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords) - } - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords) - } - } -} - -private[streaming] class ConstantEstimator(@volatile private var rate: Long) - extends RateEstimator { - - def updateRate(newRate: Long): Unit = { - rate = newRate - } - - def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) -} - -private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) - extends RateController(id, estimator) { - override def publish(rate: Long): Unit = () - override def getLatestRate(): Long = rate -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala deleted file mode 100644 index d66830c..0000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.util.Random - -import kafka.common.TopicAndPartition -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark.SparkFunSuite - -class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll { - private val topic = "kcsuitetopic" + Random.nextInt(10000) - private val topicAndPartition = TopicAndPartition(topic, 0) - private var kc: KafkaCluster = null - - private var kafkaTestUtils: KafkaTestUtils = _ - - override def beforeAll() { - kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setup() - - kafkaTestUtils.createTopic(topic) - kafkaTestUtils.sendMessages(topic, Map("a" -> 1)) - kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress)) - } - - override def afterAll() { - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown() - kafkaTestUtils = null - } - } - - test("metadata apis") { - val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition) - val leaderAddress = s"${leader._1}:${leader._2}" - assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader") - - val parts = kc.getPartitions(Set(topic)).right.get - assert(parts(topicAndPartition), "didn't get partitions") - - val err = kc.getPartitions(Set(topic + "BAD")) - assert(err.isLeft, "getPartitions for a nonexistant topic should be an error") - } - - test("leader offset apis") { - val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get - assert(earliest(topicAndPartition).offset === 0, "didn't get earliest") - - val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get - assert(latest(topicAndPartition).offset === 1, "didn't get latest") - } - - test("consumer offset apis") { - val group = "kcsuitegroup" + Random.nextInt(10000) - - val offset = Random.nextInt(10000) - - val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset)) - assert(set.isRight, "didn't set consumer offsets") - - val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get - assert(get(topicAndPartition) === offset, "didn't get consumer offsets") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala deleted file mode 100644 index 5e539c1..0000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.util.Random - -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata -import kafka.serializer.StringDecoder -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark._ - -class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { - - private var kafkaTestUtils: KafkaTestUtils = _ - - private val sparkConf = new SparkConf().setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - private var sc: SparkContext = _ - - override def beforeAll { - sc = new SparkContext(sparkConf) - kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setup() - } - - override def afterAll { - if (sc != null) { - sc.stop - sc = null - } - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown() - kafkaTestUtils = null - } - } - - test("basic usage") { - val topic = s"topicbasic-${Random.nextInt}" - kafkaTestUtils.createTopic(topic) - val messages = Array("the", "quick", "brown", "fox") - kafkaTestUtils.sendMessages(topic, messages) - - val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt}") - - val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) - - val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( - sc, kafkaParams, offsetRanges) - - val received = rdd.map(_._2).collect.toSet - assert(received === messages.toSet) - - // size-related method optimizations return sane results - assert(rdd.count === messages.size) - assert(rdd.countApprox(0).getFinalValue.mean === messages.size) - assert(!rdd.isEmpty) - assert(rdd.take(1).size === 1) - assert(rdd.take(1).head._2 === messages.head) - assert(rdd.take(messages.size + 10).size === messages.size) - - val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0))) - - assert(emptyRdd.isEmpty) - - // invalid offset ranges throw exceptions - val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) - intercept[SparkException] { - KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]( - sc, kafkaParams, badRanges) - } - } - - test("iterator boundary conditions") { - // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd - val topic = s"topicboundary-${Random.nextInt}" - val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - kafkaTestUtils.createTopic(topic) - - val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, - "group.id" -> s"test-consumer-${Random.nextInt}") - - val kc = new KafkaCluster(kafkaParams) - - // this is the "lots of messages" case - kafkaTestUtils.sendMessages(topic, sent) - val sentCount = sent.values.sum - - // rdd defined from leaders after sending messages, should get the number sent - val rdd = getRdd(kc, Set(topic)) - - assert(rdd.isDefined) - - val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges - val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum - - assert(rangeCount === sentCount, "offset range didn't include all sent messages") - assert(rdd.get.count === sentCount, "didn't get all sent messages") - - val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap - - // make sure consumer offsets are committed before the next getRdd call - kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold( - err => throw new Exception(err.mkString("\n")), - _ => () - ) - - // this is the "0 messages" case - val rdd2 = getRdd(kc, Set(topic)) - // shouldn't get anything, since message is sent after rdd was defined - val sentOnlyOne = Map("d" -> 1) - - kafkaTestUtils.sendMessages(topic, sentOnlyOne) - - assert(rdd2.isDefined) - assert(rdd2.get.count === 0, "got messages when there shouldn't be any") - - // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above - val rdd3 = getRdd(kc, Set(topic)) - // send lots of messages after rdd was defined, they shouldn't show up - kafkaTestUtils.sendMessages(topic, Map("extra" -> 22)) - - assert(rdd3.isDefined) - assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message") - - } - - // get an rdd from the committed consumer offsets until the latest leader offsets, - private def getRdd(kc: KafkaCluster, topics: Set[String]) = { - val groupId = kc.kafkaParams("group.id") - def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = { - kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( - kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs => - offs.map(kv => kv._1 -> kv._2.offset) - } - ) - } - kc.getPartitions(topics).right.toOption.flatMap { topicPartitions => - consumerOffsets(topicPartitions).flatMap { from => - kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until => - val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) => - OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset) - }.toArray - - val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) => - tp -> Broker(lo.host, lo.port) - }.toMap - - KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]( - sc, kc.kafkaParams, offsetRanges, leaders, - (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}") - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala deleted file mode 100644 index 6a35ac1..0000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.Random - -import kafka.serializer.StringDecoder -import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} - -class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { - private var ssc: StreamingContext = _ - private var kafkaTestUtils: KafkaTestUtils = _ - - override def beforeAll(): Unit = { - kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setup() - } - - override def afterAll(): Unit = { - if (ssc != null) { - ssc.stop() - ssc = null - } - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown() - kafkaTestUtils = null - } - } - - test("Kafka input stream") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - val topic = "topic1" - val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) - kafkaTestUtils.createTopic(topic) - kafkaTestUtils.sendMessages(topic, sent) - - val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress, - "group.id" -> s"test-consumer-${Random.nextInt(10000)}", - "auto.offset.reset" -> "smallest") - - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() - stream.map(_._2).countByValue().foreachRDD { r => - r.collect().foreach { kv => - result.synchronized { - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) - } - } - } - - ssc.start() - - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(result.synchronized { sent === result }) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala deleted file mode 100644 index 7b9aee3..0000000 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.io.File - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.Random - -import kafka.serializer.StringDecoder -import kafka.utils.{ZKGroupTopicDirs, ZkUtils} -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.util.Utils - -class ReliableKafkaStreamSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter with Eventually { - - private val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.receiver.writeAheadLog.enable", "true") - private val data = Map("a" -> 10, "b" -> 10, "c" -> 10) - - private var kafkaTestUtils: KafkaTestUtils = _ - - private var groupId: String = _ - private var kafkaParams: Map[String, String] = _ - private var ssc: StreamingContext = _ - private var tempDirectory: File = null - - override def beforeAll(): Unit = { - kafkaTestUtils = new KafkaTestUtils - kafkaTestUtils.setup() - - groupId = s"test-consumer-${Random.nextInt(10000)}" - kafkaParams = Map( - "zookeeper.connect" -> kafkaTestUtils.zkAddress, - "group.id" -> groupId, - "auto.offset.reset" -> "smallest" - ) - - tempDirectory = Utils.createTempDir() - } - - override def afterAll(): Unit = { - Utils.deleteRecursively(tempDirectory) - - if (kafkaTestUtils != null) { - kafkaTestUtils.teardown() - kafkaTestUtils = null - } - } - - before { - ssc = new StreamingContext(sparkConf, Milliseconds(500)) - ssc.checkpoint(tempDirectory.getAbsolutePath) - } - - after { - if (ssc != null) { - ssc.stop() - ssc = null - } - } - - test("Reliable Kafka input stream with single topic") { - val topic = "test-topic" - kafkaTestUtils.createTopic(topic) - kafkaTestUtils.sendMessages(topic, data) - - // Verify whether the offset of this group/topic/partition is 0 before starting. - assert(getCommitOffset(groupId, topic, 0) === None) - - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() - stream.map { case (k, v) => v }.foreachRDD { r => - val ret = r.collect() - ret.foreach { v => - val count = result.getOrElseUpdate(v, 0) + 1 - result.put(v, count) - } - } - ssc.start() - - eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { - // A basic process verification for ReliableKafkaReceiver. - // Verify whether received message number is equal to the sent message number. - assert(data.size === result.size) - // Verify whether each message is the same as the data to be verified. - data.keys.foreach { k => assert(data(k) === result(k).toInt) } - // Verify the offset number whether it is equal to the total message number. - assert(getCommitOffset(groupId, topic, 0) === Some(29L)) - } - } - - test("Reliable Kafka input stream with multiple topics") { - val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) - topics.foreach { case (t, _) => - kafkaTestUtils.createTopic(t) - kafkaTestUtils.sendMessages(t, data) - } - - // Before started, verify all the group/topic/partition offsets are 0. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) } - - // Consuming all the data sent to the broker which will potential commit the offsets internally. - val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY) - stream.foreachRDD(_ => Unit) - ssc.start() - - eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { - // Verify the offset for each group/topic to see whether they are equal to the expected one. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } - } - } - - - /** Getting partition offset from Zookeeper. */ - private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { - val topicDirs = new ZKGroupTopicDirs(groupId, topic) - val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3e783fa..d71913c 100644 --- a/pom.xml +++ b/pom.xml @@ -108,8 +108,8 @@ <module>examples</module> <module>repl</module> <module>launcher</module> - <module>external/kafka</module> - <module>external/kafka-assembly</module> + <module>external/kafka-0-8</module> + <module>external/kafka-0-8-assembly</module> </modules> <properties> http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/project/MimaBuild.scala ---------------------------------------------------------------------- diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index 3dc1cea..2a989dd 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -89,7 +89,15 @@ object MimaBuild { def mimaSettings(sparkHome: File, projectRef: ProjectRef) = { val organization = "org.apache.spark" val previousSparkVersion = "1.6.0" - val fullId = "spark-" + projectRef.project + "_2.11" + // This check can be removed post-2.0 + val project = if (previousSparkVersion == "1.6.0" && + projectRef.project == "streaming-kafka-0-8" + ) { + "streaming-kafka" + } else { + projectRef.project + } + val fullId = "spark-" + project + "_2.11" mimaDefaultSettings ++ Seq(previousArtifact := Some(organization % fullId % previousSparkVersion), binaryIssueFilters ++= ignoredABIProblems(sparkHome, version.value)) http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f50f41a..d83afa0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -46,7 +46,7 @@ object BuildCommons { val streamingProjects@Seq( streaming, streamingFlumeSink, streamingFlume, streamingKafka ) = Seq( - "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka" + "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8" ).map(ProjectRef(buildLocation, _)) val allProjects@Seq( @@ -62,7 +62,7 @@ object BuildCommons { "docker-integration-tests").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) = - Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly") + Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kinesis-asl-assembly") .map(ProjectRef(buildLocation, _)) val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples") @@ -581,8 +581,8 @@ object Assembly { .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => - if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) { - // This must match the same name used in maven (see external/kafka-assembly/pom.xml) + if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-0-8-assembly") || mName.contains("streaming-kinesis-asl-assembly")) { + // This must match the same name used in maven (see external/kafka-0-8-assembly/pom.xml) s"${mName}-${v}.jar" } else { s"${mName}-${v}-hadoop${hv}.jar" http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index 02a8869..015ca77 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -208,13 +208,13 @@ ________________________________________________________________________________ 1. Include the Kafka library and its dependencies with in the spark-submit command as - $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ... + $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:%s ... 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s. + Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = %s. Then, include the jar in the spark-submit command as - $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ... + $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ... ________________________________________________________________________________________________ http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f27628c..360ba1e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1476,13 +1476,13 @@ def search_jar(dir, name_prefix): def search_kafka_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] - kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly") - jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-assembly") + kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-0-8-assembly") + jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-0-8-assembly") if not jars: raise Exception( ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + "You need to build Spark with " - "'build/sbt assembly/package streaming-kafka-assembly/assembly' or " + "'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or " "'build/mvn package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
