Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9f2eb27a4 -> f460a199e


http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000..3eb8a73
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to 
set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
+ */
+class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkUtils: ZkUtils = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 0
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: Producer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkUtils = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
+    Option(zkUtils).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      brokerPort = server.boundPort()
+      (server, brokerPort)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
+
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
+    
zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String): Unit = {
+    createTopic(topic, 1)
+  }
+
+  /** Delete a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def deleteTopic(topic: String): Unit = {
+    val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
+    AdminUtils.deleteTopic(zkUtils, topic)
+    verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
+  }
+
+  /** Add new paritions to a Kafka topic */
+  def addPartitions(topic: String, partitions: Int): Unit = {
+    AdminUtils.addPartitions(zkUtils, topic, partitions)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Seq[(String, 
RecordMetadata)] = {
+    producer = new KafkaProducer[String, String](producerConfiguration)
+    val offsets = try {
+      messages.map { m =>
+        val metadata =
+          producer.send(new ProducerRecord[String, String](topic, m)).get(10, 
TimeUnit.SECONDS)
+          logInfo(s"\tSent $m to partition ${metadata.partition}, offset 
${metadata.offset}")
+        (m, metadata)
+      }
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+    offsets
+  }
+
+  def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
+    val kc = new KafkaConsumer[String, String](consumerConfiguration)
+    logInfo("Created consumer to get latest offsets")
+    kc.subscribe(topics.asJavaCollection)
+    kc.poll(0)
+    val partitions = kc.assignment()
+    kc.pause(partitions)
+    kc.seekToEnd(partitions)
+    val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+    kc.close()
+    logInfo("Closed consumer to get latest offsets")
+    offsets
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("advertised.host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props.put("delete.topic.enable", "true")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("value.serializer", classOf[StringSerializer].getName)
+    props.put("key.serializer", classOf[StringSerializer].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("acks", "all")
+    props
+  }
+
+  private def consumerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("bootstrap.servers", brokerAddress)
+    props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt)
+    props.put("value.deserializer", classOf[StringDeserializer].getName)
+    props.put("key.deserializer", classOf[StringDeserializer].getName)
+    props.put("enable.auto.commit", "false")
+    props
+  }
+
+  private def verifyTopicDeletion(
+      zkUtils: ZkUtils,
+      topic: String,
+      numPartitions: Int,
+      servers: Seq[KafkaServer]) {
+    import ZkUtils._
+    val topicAndPartitions = (0 until 
numPartitions).map(TopicAndPartition(topic, _))
+    def isDeleted(): Boolean = {
+      // wait until admin path for delete topic is deleted, signaling 
completion of topic deletion
+      val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
+      val topicPath = !zkUtils.pathExists(getTopicPath(topic))
+      // ensure that the topic-partition has been deleted from all brokers' 
replica managers
+      val replicaManager = servers.forall(server => 
topicAndPartitions.forall(tp =>
+        server.replicaManager.getPartition(tp.topic, tp.partition) == None))
+      // ensure that logs from all replicas are deleted if delete topic is 
marked successful
+      val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
+        server.getLogManager().getLog(tp).isEmpty))
+      // ensure that topic is removed from all cleaner offsets
+      val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
+        val checkpoints = server.getLogManager().logDirs.map { logDir =>
+          new OffsetCheckpoint(new File(logDir, 
"cleaner-offset-checkpoint")).read()
+        }
+        checkpoints.forall(checkpointsPerLogDir => 
!checkpointsPerLogDir.contains(tp))
+      })
+      deletePath && topicPath && replicaManager && logManager && cleaner
+    }
+    eventually(timeout(10.seconds)) {
+      assert(isDeleted, s"$topic not deleted after timeout")
+    }
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(timeout(10.seconds)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      Utils.deleteRecursively(snapshotDir)
+      Utils.deleteRecursively(logDir)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bb4b8a0..90d5ca0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
     <module>external/kafka-0-8-assembly</module>
     <module>external/kafka-0-10</module>
     <module>external/kafka-0-10-assembly</module>
+    <module>external/kafka-0-10-sql</module>
   </modules>
 
   <properties>

http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 133d3b3..7f7a65f 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -39,8 +39,8 @@ object BuildCommons {
 
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
-  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq(
-    "catalyst", "sql", "hive", "hive-thriftserver"
+  val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = 
Seq(
+    "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
   ).map(ProjectRef(buildLocation, _))
 
   val streamingProjects@Seq(
@@ -352,7 +352,7 @@ object SparkBuild extends PomBuild {
   val mimaProjects = allProjects.filterNot { x =>
     Seq(
       spark, hive, hiveThriftServer, catalyst, repl, networkCommon, 
networkShuffle, networkYarn,
-      unsafe, tags
+      unsafe, tags, sqlKafka010
     ).contains(x)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 5e67226..6a4ec7a 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -133,6 +133,33 @@
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
     <plugins>
+       <!--
+            This plugin forces the generation of jar containing sql test 
classes,
+            so that the tests classes of external modules can use them. The 
two execution profiles
+            are necessary - first one for 'mvn package', second one for 'mvn 
test-compile'. Ideally,
+            'mvn compile' should not compile test classes and therefore should 
not need this.
+            However, an open Maven bug 
(http://jira.codehaus.org/browse/MNG-3559)
+            causes the compilation to fail if catalyst test-jar is not 
generated. Hence, the
+            second execution profile for 'mvn test-compile'.
+      -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>test-jar-on-test-compile</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index da1ae05..8e0688d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -116,7 +116,7 @@ class StreamExecution(
    * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
    */
   val microBatchThread =
-    new UninterruptibleThread(s"stream execution thread for $name") {
+    new StreamExecutionThread(s"stream execution thread for $name") {
       override def run(): Unit = {
         // To fix call site like "run at <unknown>:0", we bridge the call site 
from the caller
         // thread to this micro batch thread
@@ -535,3 +535,9 @@ object StreamExecution {
 
   def nextId: Long = _nextId.getAndIncrement()
 }
+
+/**
+ * A special thread to run the stream query. Some codes require to run in the 
StreamExecutionThread
+ * and will use `classOf[StreamExecutionThread]` to check.
+ */
+abstract class StreamExecutionThread(name: String) extends 
UninterruptibleThread(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/f460a199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 1d467c6..d4c64e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, ManualClock, 
SystemClock, Utils}
  *
  * {{{
  *  val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(_ + 1)
-
-    testStream(mapped)(
-      AddData(inputData, 1, 2, 3),
-      CheckAnswer(2, 3, 4))
+ *  val mapped = inputData.toDS().map(_ + 1)
+ *
+ *  testStream(mapped)(
+ *    AddData(inputData, 1, 2, 3),
+ *    CheckAnswer(2, 3, 4))
  * }}}
  *
  * Note that while we do sleep to allow the other thread to progress without 
spinning,
@@ -468,21 +468,41 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
   }
 
+
+  /**
+   * Creates a stress test that randomly starts/stops/adds data/checks the 
result.
+   *
+   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result
+   * @param addData an add data action that adds the given numbers to the 
stream, encoding them
+   *                as needed
+   * @param iterations the iteration number
+   */
+  def runStressTest(
+    ds: Dataset[Int],
+    addData: Seq[Int] => StreamAction,
+    iterations: Int = 100): Unit = {
+    runStressTest(ds, Seq.empty, (data, running) => addData(data), iterations)
+  }
+
   /**
    * Creates a stress test that randomly starts/stops/adds data/checks the 
result.
    *
-   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result.
-   * @param addData and add data action that adds the given numbers to the 
stream, encoding them
+   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result
+   * @param prepareActions actions need to run before starting the stress test.
+   * @param addData an add data action that adds the given numbers to the 
stream, encoding them
    *                as needed
+   * @param iterations the iteration number
    */
   def runStressTest(
       ds: Dataset[Int],
-      addData: Seq[Int] => StreamAction,
-      iterations: Int = 100): Unit = {
+      prepareActions: Seq[StreamAction],
+      addData: (Seq[Int], Boolean) => StreamAction,
+      iterations: Int): Unit = {
     implicit val intEncoder = ExpressionEncoder[Int]()
     var dataPos = 0
     var running = true
     val actions = new ArrayBuffer[StreamAction]()
+    actions ++= prepareActions
 
     def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
 
@@ -490,7 +510,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       val numItems = Random.nextInt(10)
       val data = dataPos until (dataPos + numItems)
       dataPos += numItems
-      actions += addData(data)
+      actions += addData(data, running)
     }
 
     (1 to iterations).foreach { i =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to