Repository: spark Updated Branches: refs/heads/branch-2.0 9f2eb27a4 -> f460a199e
http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala new file mode 100644 index 0000000..3eb8a73 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.File +import java.lang.{Integer => JInt} +import java.net.InetSocketAddress +import java.util.{Map => JMap, Properties} +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.language.postfixOps +import scala.util.Random + +import kafka.admin.AdminUtils +import kafka.api.Request +import kafka.common.TopicAndPartition +import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint} +import kafka.utils.ZkUtils +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf + +/** + * This is a helper class for Kafka test suites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + * + * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. + */ +class KafkaTestUtils extends Logging { + + // Zookeeper related configurations + private val zkHost = "localhost" + private var zkPort: Int = 0 + private val zkConnectionTimeout = 60000 + private val zkSessionTimeout = 6000 + + private var zookeeper: EmbeddedZookeeper = _ + + private var zkUtils: ZkUtils = _ + + // Kafka broker related configurations + private val brokerHost = "localhost" + private var brokerPort = 0 + private var brokerConf: KafkaConfig = _ + + // Kafka broker server + private var server: KafkaServer = _ + + // Kafka producer + private var producer: Producer[String, String] = _ + + // Flag to test whether the system is correctly started + private var zkReady = false + private var brokerReady = false + + def zkAddress: String = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") + s"$brokerHost:$brokerPort" + } + + def zookeeperClient: ZkUtils = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") + Option(zkUtils).getOrElse( + throw new IllegalStateException("Zookeeper client is not yet initialized")) + } + + // Set up the Embedded Zookeeper server and get the proper Zookeeper port + private def setupEmbeddedZookeeper(): Unit = { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + // Get the actual zookeeper binding port + zkPort = zookeeper.actualPort + zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) + zkReady = true + } + + // Set up the Embedded Kafka server + private def setupEmbeddedKafkaServer(): Unit = { + assert(zkReady, "Zookeeper should be set up beforehand") + + // Kafka broker startup + Utils.startServiceOnPort(brokerPort, port => { + brokerPort = port + brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) + server = new KafkaServer(brokerConf) + server.startup() + brokerPort = server.boundPort() + (server, brokerPort) + }, new SparkConf(), "KafkaBroker") + + brokerReady = true + } + + /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ + def setup(): Unit = { + setupEmbeddedZookeeper() + setupEmbeddedKafkaServer() + } + + /** Teardown the whole servers, including Kafka broker and Zookeeper */ + def teardown(): Unit = { + brokerReady = false + zkReady = false + + if (producer != null) { + producer.close() + producer = null + } + + if (server != null) { + server.shutdown() + server = null + } + + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { + AdminUtils.createTopic(zkUtils, topic, partitions, 1) + // wait until metadata is propagated + (0 until partitions).foreach { p => + waitUntilMetadataIsPropagated(topic, p) + } + } + + def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { + zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String): Unit = { + createTopic(topic, 1) + } + + /** Delete a Kafka topic and wait until it is propagated to the whole cluster */ + def deleteTopic(topic: String): Unit = { + val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size + AdminUtils.deleteTopic(zkUtils, topic) + verifyTopicDeletion(zkUtils, topic, partitions, List(this.server)) + } + + /** Add new paritions to a Kafka topic */ + def addPartitions(topic: String, partitions: Int): Unit = { + AdminUtils.addPartitions(zkUtils, topic, partitions) + // wait until metadata is propagated + (0 until partitions).foreach { p => + waitUntilMetadataIsPropagated(topic, p) + } + } + + /** Java-friendly function for sending messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { + sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) + } + + /** Send the messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + /** Send the array of messages to the Kafka broker */ + def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { + producer = new KafkaProducer[String, String](producerConfiguration) + val offsets = try { + messages.map { m => + val metadata = + producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") + (m, metadata) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + offsets + } + + def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { + val kc = new KafkaConsumer[String, String](consumerConfiguration) + logInfo("Created consumer to get latest offsets") + kc.subscribe(topics.asJavaCollection) + kc.poll(0) + val partitions = kc.assignment() + kc.pause(partitions) + kc.seekToEnd(partitions) + val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap + kc.close() + logInfo("Closed consumer to get latest offsets") + offsets + } + + private def brokerConfiguration: Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("advertised.host.name", "localhost") + props.put("port", brokerPort.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkAddress) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props.put("delete.topic.enable", "true") + props + } + + private def producerConfiguration: Properties = { + val props = new Properties() + props.put("bootstrap.servers", brokerAddress) + props.put("value.serializer", classOf[StringSerializer].getName) + props.put("key.serializer", classOf[StringSerializer].getName) + // wait for all in-sync replicas to ack sends + props.put("acks", "all") + props + } + + private def consumerConfiguration: Properties = { + val props = new Properties() + props.put("bootstrap.servers", brokerAddress) + props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt) + props.put("value.deserializer", classOf[StringDeserializer].getName) + props.put("key.deserializer", classOf[StringDeserializer].getName) + props.put("enable.auto.commit", "false") + props + } + + private def verifyTopicDeletion( + zkUtils: ZkUtils, + topic: String, + numPartitions: Int, + servers: Seq[KafkaServer]) { + import ZkUtils._ + val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + def isDeleted(): Boolean = { + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic)) + val topicPath = !zkUtils.pathExists(getTopicPath(topic)) + // ensure that the topic-partition has been deleted from all brokers' replica managers + val replicaManager = servers.forall(server => topicAndPartitions.forall(tp => + server.replicaManager.getPartition(tp.topic, tp.partition) == None)) + // ensure that logs from all replicas are deleted if delete topic is marked successful + val logManager = servers.forall(server => topicAndPartitions.forall(tp => + server.getLogManager().getLog(tp).isEmpty)) + // ensure that topic is removed from all cleaner offsets + val cleaner = servers.forall(server => topicAndPartitions.forall { tp => + val checkpoints = server.getLogManager().logDirs.map { logDir => + new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + } + checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) + }) + deletePath && topicPath && replicaManager && logManager && cleaner + } + eventually(timeout(10.seconds)) { + assert(isDeleted, s"$topic not deleted after timeout") + } + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { + case Some(partitionState) => + val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + + zkUtils.getLeaderForPartition(topic, partition).isDefined && + Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && + leaderAndInSyncReplicas.isr.size >= 1 + + case _ => + false + } + eventually(timeout(10.seconds)) { + assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") + } + } + + private class EmbeddedZookeeper(val zkConnect: String) { + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + val actualPort = factory.getLocalPort + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bb4b8a0..90d5ca0 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ <module>external/kafka-0-8-assembly</module> <module>external/kafka-0-10</module> <module>external/kafka-0-10-assembly</module> + <module>external/kafka-0-10-sql</module> </modules> <properties> http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 133d3b3..7f7a65f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -39,8 +39,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq( - "catalyst", "sql", "hive", "hive-thriftserver" + val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = Seq( + "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10" ).map(ProjectRef(buildLocation, _)) val streamingProjects@Seq( @@ -352,7 +352,7 @@ object SparkBuild extends PomBuild { val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, - unsafe, tags + unsafe, tags, sqlKafka010 ).contains(x) } http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/pom.xml ---------------------------------------------------------------------- diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 5e67226..6a4ec7a 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -133,6 +133,33 @@ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugins> + <!-- + This plugin forces the generation of jar containing sql test classes, + so that the tests classes of external modules can use them. The two execution profiles + are necessary - first one for 'mvn package', second one for 'mvn test-compile'. Ideally, + 'mvn compile' should not compile test classes and therefore should not need this. + However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559) + causes the compilation to fail if catalyst test-jar is not generated. Hence, the + second execution profile for 'mvn test-compile'. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + <execution> + <id>test-jar-on-test-compile</id> + <phase>test-compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index da1ae05..8e0688d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -116,7 +116,7 @@ class StreamExecution( * [[HDFSMetadataLog]]. See SPARK-14131 for more details. */ val microBatchThread = - new UninterruptibleThread(s"stream execution thread for $name") { + new StreamExecutionThread(s"stream execution thread for $name") { override def run(): Unit = { // To fix call site like "run at <unknown>:0", we bridge the call site from the caller // thread to this micro batch thread @@ -535,3 +535,9 @@ object StreamExecution { def nextId: Long = _nextId.getAndIncrement() } + +/** + * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread + * and will use `classOf[StreamExecutionThread]` to check. + */ +abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name) http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 1d467c6..d4c64e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} * * {{{ * val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(_ + 1) - - testStream(mapped)( - AddData(inputData, 1, 2, 3), - CheckAnswer(2, 3, 4)) + * val mapped = inputData.toDS().map(_ + 1) + * + * testStream(mapped)( + * AddData(inputData, 1, 2, 3), + * CheckAnswer(2, 3, 4)) * }}} * * Note that while we do sleep to allow the other thread to progress without spinning, @@ -468,21 +468,41 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + + /** + * Creates a stress test that randomly starts/stops/adds data/checks the result. + * + * @param ds a dataframe that executes + 1 on a stream of integers, returning the result + * @param addData an add data action that adds the given numbers to the stream, encoding them + * as needed + * @param iterations the iteration number + */ + def runStressTest( + ds: Dataset[Int], + addData: Seq[Int] => StreamAction, + iterations: Int = 100): Unit = { + runStressTest(ds, Seq.empty, (data, running) => addData(data), iterations) + } + /** * Creates a stress test that randomly starts/stops/adds data/checks the result. * - * @param ds a dataframe that executes + 1 on a stream of integers, returning the result. - * @param addData and add data action that adds the given numbers to the stream, encoding them + * @param ds a dataframe that executes + 1 on a stream of integers, returning the result + * @param prepareActions actions need to run before starting the stress test. + * @param addData an add data action that adds the given numbers to the stream, encoding them * as needed + * @param iterations the iteration number */ def runStressTest( ds: Dataset[Int], - addData: Seq[Int] => StreamAction, - iterations: Int = 100): Unit = { + prepareActions: Seq[StreamAction], + addData: (Seq[Int], Boolean) => StreamAction, + iterations: Int): Unit = { implicit val intEncoder = ExpressionEncoder[Int]() var dataPos = 0 var running = true val actions = new ArrayBuffer[StreamAction]() + actions ++= prepareActions def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) } @@ -490,7 +510,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val numItems = Random.nextInt(10) val data = dataPos until (dataPos + numItems) dataPos += numItems - actions += addData(data) + actions += addData(data, running) } (1 to iterations).foreach { i => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
