Repository: spark
Updated Branches:
  refs/heads/master 22ba21348 -> 12a20c144


[SPARK-10820][SQL] Support for the continuous execution of structured queries

This is a follow up to 9aadcffabd226557174f3ff566927f873c71672e that extends 
Spark SQL to allow users to _repeatedly_ optimize and execute structured 
queries.  A `ContinuousQuery` can be expressed using SQL, DataFrames or 
Datasets.  The purpose of this PR is only to add some initial infrastructure 
which will be extended in subsequent PRs.

## User-facing API

- `sqlContext.streamFrom` and `df.streamTo` return builder objects that are 
analogous to the `read/write` interfaces already available to executing queries 
in a batch-oriented fashion.
- `ContinuousQuery` provides an interface for interacting with a query that is 
currently executing in the background.

## Internal Interfaces
 - `StreamExecution` - executes streaming queries in micro-batches

The following are currently internal, but public APIs will be provided in a 
future release.
 - `Source` - an interface for providers of continually arriving data.  A 
source must have a notion of an `Offset` that monotonically tracks what data 
has arrived.  For fault tolerance, a source must be able to replay data given a 
start offset.
 - `Sink` - an interface that accepts the results of a continuously executing 
query.  Also responsible for tracking the offset that should be resumed from in 
the case of a failure.

## Testing
 - `MemoryStream` and `MemorySink` - simple implementations of source and sink 
that keep all data in memory and have methods for simulating durability failures
 - `StreamTest` - a framework for performing actions and checking invariants on 
a continuous query

Author: Michael Armbrust <[email protected]>
Author: Tathagata Das <[email protected]>
Author: Josh Rosen <[email protected]>

Closes #11006 from marmbrus/structured-streaming.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12a20c14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12a20c14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12a20c14

Branch: refs/heads/master
Commit: 12a20c144f14e80ef120ddcfb0b455a805a2da23
Parents: 22ba213
Author: Michael Armbrust <[email protected]>
Authored: Tue Feb 2 10:13:54 2016 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Tue Feb 2 10:13:54 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/ContinuousQuery.scala  |  30 ++
 .../scala/org/apache/spark/sql/DataFrame.scala  |   8 +
 .../org/apache/spark/sql/DataStreamReader.scala | 127 +++++++
 .../org/apache/spark/sql/DataStreamWriter.scala | 134 +++++++
 .../scala/org/apache/spark/sql/SQLContext.scala |   8 +
 .../datasources/ResolvedDataSource.scala        |  33 +-
 .../spark/sql/execution/streaming/Batch.scala   |  26 ++
 .../execution/streaming/CompositeOffset.scala   |  67 ++++
 .../sql/execution/streaming/LongOffset.scala    |  33 ++
 .../spark/sql/execution/streaming/Offset.scala  |  37 ++
 .../spark/sql/execution/streaming/Sink.scala    |  47 +++
 .../spark/sql/execution/streaming/Source.scala  |  36 ++
 .../execution/streaming/StreamExecution.scala   | 211 +++++++++++
 .../execution/streaming/StreamProgress.scala    |  67 ++++
 .../execution/streaming/StreamingRelation.scala |  34 ++
 .../spark/sql/execution/streaming/memory.scala  | 138 ++++++++
 .../apache/spark/sql/sources/interfaces.scala   |  21 ++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  74 ++--
 .../scala/org/apache/spark/sql/StreamTest.scala | 346 +++++++++++++++++++
 .../sql/streaming/DataStreamReaderSuite.scala   | 166 +++++++++
 .../sql/streaming/MemorySourceStressSuite.scala |  33 ++
 .../spark/sql/streaming/OffsetSuite.scala       |  98 ++++++
 .../spark/sql/streaming/StreamSuite.scala       |  84 +++++
 .../spark/sql/test/SharedSQLContext.scala       |   2 +-
 24 files changed, 1828 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
new file mode 100644
index 0000000..1c2c029
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+/**
+ * A handle to a query that is executing continuously in the background as new 
data arrives.
+ */
+trait ContinuousQuery {
+
+  /**
+   * Stops the execution of this query if it is running.  This method blocks 
until the threads
+   * performing execution has stopped.
+   */
+  def stop(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 518f9dc..6de17e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1691,6 +1691,14 @@ class DataFrame private[sql](
   def write: DataFrameWriter = new DataFrameWriter(this)
 
   /**
+   * :: Experimental ::
+   * Interface for starting a streaming query that will continually output 
results to the specified
+   * external sink as new data arrives.
+   */
+  @Experimental
+  def streamTo: DataStreamWriter = new DataStreamWriter(this)
+
+  /**
    * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
    * @group rdd
    * @since 1.3.0

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
new file mode 100644
index 0000000..2febc93
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
@@ -0,0 +1,127 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ * An interface to reading streaming data.  Use `sqlContext.streamFrom` to 
access these methods.
+ *
+ * {{{
+ *   val df = sqlContext.streamFrom
+ *    .format("...")
+ *    .open()
+ * }}}
+ */
+@Experimental
+class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 2.0.0
+   */
+  def format(source: String): DataStreamReader = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data streams (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data stream can
+   * skip the schema inference step, and thus speed up data reading.
+   *
+   * @since 2.0.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    this.userSpecifiedSchema = Option(schema)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data stream.
+   *
+   * @since 2.0.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    this.extraOptions += (key -> value)
+    this
+  }
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data stream.
+   *
+   * @since 2.0.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds input options for the underlying data stream.
+   *
+   * @since 2.0.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * Loads streaming input in as a [[DataFrame]], for data streams that don't 
require a path (e.g.
+   * external key-value stores).
+   *
+   * @since 2.0.0
+   */
+  def open(): DataFrame = {
+    val resolved = ResolvedDataSource.createSource(
+      sqlContext,
+      userSpecifiedSchema = userSpecifiedSchema,
+      providerName = source,
+      options = extraOptions.toMap)
+    DataFrame(sqlContext, StreamingRelation(resolved))
+  }
+
+  /**
+   * Loads input in as a [[DataFrame]], for data streams that read from some 
path.
+   *
+   * @since 2.0.0
+   */
+  def open(path: String): DataFrame = {
+    option("path", path).open()
+  }
+
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = sqlContext.conf.defaultDataSourceName
+
+  private var userSpecifiedSchema: Option[StructType] = None
+
+  private var extraOptions = new scala.collection.mutable.HashMap[String, 
String]
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
new file mode 100644
index 0000000..b325d48
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.streaming.StreamExecution
+
+/**
+ * :: Experimental ::
+ * Interface used to start a streaming query query execution.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter private[sql](df: DataFrame) {
+
+  /**
+   * Specifies the underlying output data source. Built-in options include 
"parquet", "json", etc.
+   *
+   * @since 2.0.0
+   */
+  def format(source: String): DataStreamWriter = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  def option(key: String, value: String): DataStreamWriter = {
+    this.extraOptions += (key -> value)
+    this
+  }
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamWriter 
= {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamWriter = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If 
specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme.\
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataStreamWriter = {
+    this.partitioningColumns = colNames
+    this
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the given
+   * path as new data arrives.  The returned [[ContinuousQuery]] object can be 
used to interact with
+   * the stream.
+   * @since 2.0.0
+   */
+  def start(path: String): ContinuousQuery = {
+    this.extraOptions += ("path" -> path)
+    start()
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the given
+   * path as new data arrives.  The returned [[ContinuousQuery]] object can be 
used to interact with
+   * the stream.
+   *
+   * @since 2.0.0
+   */
+  def start(): ContinuousQuery = {
+    val sink = ResolvedDataSource.createSink(
+      df.sqlContext,
+      source,
+      extraOptions.toMap,
+      normalizedParCols)
+
+    new StreamExecution(df.sqlContext, df.logicalPlan, sink)
+  }
+
+  private def normalizedParCols: Seq[String] = {
+    partitioningColumns.map { col =>
+      df.logicalPlan.output
+        .map(_.name)
+        .find(df.sqlContext.analyzer.resolver(_, col))
+        .getOrElse(throw new AnalysisException(s"Partition column $col not 
found in existing " +
+            s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
+    }
+  }
+
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = df.sqlContext.conf.defaultDataSourceName
+
+  private var extraOptions = new scala.collection.mutable.HashMap[String, 
String]
+
+  private var partitioningColumns: Seq[String] = Nil
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ef993c3..13700be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -594,6 +594,14 @@ class SQLContext private[sql](
   @Experimental
   def read: DataFrameReader = new DataFrameReader(this)
 
+
+  /**
+   * :: Experimental ::
+   * Returns a [[DataStreamReader]] than can be used to access data 
continuously as it arrives.
+   */
+  @Experimental
+  def streamFrom: DataStreamReader = new DataStreamReader(this)
+
   /**
    * :: Experimental ::
    * Creates an external table from the given path and returns the 
corresponding DataFrame.

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index cc8dcf5..e3065ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.util.StringUtils
 import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
 import org.apache.spark.util.Utils
 
-
 case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
 
 
@@ -92,6 +92,37 @@ object ResolvedDataSource extends Logging {
     }
   }
 
+  def createSource(
+      sqlContext: SQLContext,
+      userSpecifiedSchema: Option[StructType],
+      providerName: String,
+      options: Map[String, String]): Source = {
+    val provider = lookupDataSource(providerName).newInstance() match {
+      case s: StreamSourceProvider => s
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Data source $providerName does not support streamed reading")
+    }
+
+    provider.createSource(sqlContext, options, userSpecifiedSchema)
+  }
+
+  def createSink(
+      sqlContext: SQLContext,
+      providerName: String,
+      options: Map[String, String],
+      partitionColumns: Seq[String]): Sink = {
+    val provider = lookupDataSource(providerName).newInstance() match {
+      case s: StreamSinkProvider => s
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Data source $providerName does not support streamed writing")
+    }
+
+    provider.createSink(sqlContext, options, partitionColumns)
+  }
+
+
   /** Create a [[ResolvedDataSource]] for reading data in. */
   def apply(
       sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
new file mode 100644
index 0000000..1f25eb8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.DataFrame
+
+/**
+ * Used to pass a batch of data through a streaming query execution along with 
an indication
+ * of progress in the stream.
+ */
+class Batch(val end: Offset, val data: DataFrame)

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
new file mode 100644
index 0000000..d2cb20e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.Try
+
+/**
+ * An ordered collection of offsets, used to track the progress of processing 
data from one or more
+ * [[Source]]s that are present in a streaming query. This is similar to 
simplified, single-instance
+ * vector clock that must progress linearly forward.
+ */
+case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
+  /**
+   * Returns a negative integer, zero, or a positive integer as this object is 
less than, equal to,
+   * or greater than the specified object.
+   */
+  override def compareTo(other: Offset): Int = other match {
+    case otherComposite: CompositeOffset if otherComposite.offsets.size == 
offsets.size =>
+      val comparisons = offsets.zip(otherComposite.offsets).map {
+        case (Some(a), Some(b)) => a compareTo b
+        case (None, None) => 0
+        case (None, _) => -1
+        case (_, None) => 1
+      }
+      val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
+      nonZeroSigns.size match {
+        case 0 => 0                       // if both empty or only 0s
+        case 1 => nonZeroSigns.head       // if there are only (0s and 1s) or 
(0s and -1s)
+        case _ =>                         // there are both 1s and -1s
+          throw new IllegalArgumentException(
+            s"Invalid comparison between non-linear histories: $this <=> 
$other")
+      }
+    case _ =>
+      throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
+  }
+
+  private def sign(num: Int): Int = num match {
+    case i if i < 0 => -1
+    case i if i == 0 => 0
+    case i if i > 0 => 1
+  }
+}
+
+object CompositeOffset {
+  /**
+   * Returns a [[CompositeOffset]] with a variable sequence of offsets.
+   * `nulls` in the sequence are converted to `None`s.
+   */
+  def fill(offsets: Offset*): CompositeOffset = {
+    CompositeOffset(offsets.map(Option(_)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
new file mode 100644
index 0000000..008195a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A simple offset for sources that produce a single linear stream of data.
+ */
+case class LongOffset(offset: Long) extends Offset {
+
+  override def compareTo(other: Offset): Int = other match {
+    case l: LongOffset => offset.compareTo(l.offset)
+    case _ =>
+      throw new IllegalArgumentException(s"Invalid comparison of $getClass 
with ${other.getClass}")
+  }
+
+  def +(increment: Long): LongOffset = new LongOffset(offset + increment)
+  def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
new file mode 100644
index 0000000..0f5d644
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A offset is a monotonically increasing metric used to track progress in the 
computation of a
+ * stream. An [[Offset]] must be comparable, and the result of `compareTo` 
must be consistent
+ * with `equals` and `hashcode`.
+ */
+trait Offset extends Serializable {
+
+  /**
+   * Returns a negative integer, zero, or a positive integer as this object is 
less than, equal to,
+   * or greater than the specified object.
+   */
+  def compareTo(other: Offset): Int
+
+  def >(other: Offset): Boolean = compareTo(other) > 0
+  def <(other: Offset): Boolean = compareTo(other) < 0
+  def <=(other: Offset): Boolean = compareTo(other) <= 0
+  def >=(other: Offset): Boolean = compareTo(other) >= 0
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
new file mode 100644
index 0000000..1bd71b6
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * An interface for systems that can collect the results of a streaming query.
+ *
+ * When new data is produced by a query, a [[Sink]] must be able to 
transactionally collect the
+ * data and update the [[Offset]]. In the case of a failure, the sink will be 
recreated
+ * and must be able to return the [[Offset]] for all of the data that is made 
durable.
+ * This contract allows Spark to process data with exactly-once semantics, 
even in the case
+ * of failures that require the computation to be restarted.
+ */
+trait Sink {
+  /**
+   * Returns the [[Offset]] for all data that is currently present in the 
sink, if any. This
+   * function will be called by Spark when restarting execution in order to 
determine at which point
+   * in the input stream computation should be resumed from.
+   */
+  def currentOffset: Option[Offset]
+
+  /**
+   * Accepts a new batch of data as well as a [[Offset]] that denotes how far 
in the input
+   * data computation has progressed to.  When computation restarts after a 
failure, it is important
+   * that a [[Sink]] returns the same [[Offset]] as the most recent batch of 
data that
+   * has been persisted durrably.  Note that this does not necessarily have to 
be the
+   * [[Offset]] for the most recent batch of data that was given to the sink.  
For example,
+   * it is valid to buffer data before persisting, as long as the [[Offset]] 
is stored
+   * transactionally as data is eventually persisted.
+   */
+  def addBatch(batch: Batch): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
new file mode 100644
index 0000000..2592297
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A source of continually arriving data for a streaming query. A [[Source]] 
must have a
+ * monotonically increasing notion of progress that can be represented as an 
[[Offset]]. Spark
+ * will regularly query each [[Source]] to see if any more data is available.
+ */
+trait Source  {
+
+  /** Returns the schema of the data from this source */
+  def schema: StructType
+
+  /**
+   * Returns the next batch of data that is available after `start`, if any is 
available.
+   */
+  def getNextBatch(start: Option[Offset]): Option[Batch]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
new file mode 100644
index 0000000..ebebb82
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.QueryExecution
+
+/**
+ * Manages the execution of a streaming Spark SQL query that is occurring in a 
separate thread.
+ * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
+ * [[Source]] present in the query plan. Whenever new data arrives, a 
[[QueryExecution]] is created
+ * and the results are committed transactionally to the given [[Sink]].
+ */
+class StreamExecution(
+    sqlContext: SQLContext,
+    private[sql] val logicalPlan: LogicalPlan,
+    val sink: Sink) extends ContinuousQuery with Logging {
+
+  /** An monitor used to wait/notify when batches complete. */
+  private val awaitBatchLock = new Object
+
+  @volatile
+  private var batchRun = false
+
+  /** Minimum amount of time in between the start of each batch. */
+  private val minBatchTime = 10
+
+  /** Tracks how much data we have processed from each input source. */
+  private[sql] val streamProgress = new StreamProgress
+
+  /** All stream sources present the query plan. */
+  private val sources =
+    logicalPlan.collect { case s: StreamingRelation => s.source }
+
+  // Start the execution at the current offsets stored in the sink. (i.e. 
avoid reprocessing data
+  // that we have already processed).
+  {
+    sink.currentOffset match {
+      case Some(c: CompositeOffset) =>
+        val storedProgress = c.offsets
+        val sources = logicalPlan collect {
+          case StreamingRelation(source, _) => source
+        }
+
+        assert(sources.size == storedProgress.size)
+        sources.zip(storedProgress).foreach { case (source, offset) =>
+          offset.foreach(streamProgress.update(source, _))
+        }
+      case None => // We are starting this stream for the first time.
+      case _ => throw new IllegalArgumentException("Expected composite offset 
from sink")
+    }
+  }
+
+  logInfo(s"Stream running at $streamProgress")
+
+  /** When false, signals to the microBatchThread that it should stop running. 
*/
+  @volatile private var shouldRun = true
+
+  /** The thread that runs the micro-batches of this stream. */
+  private[sql] val microBatchThread = new Thread("stream execution thread") {
+    override def run(): Unit = {
+      SQLContext.setActive(sqlContext)
+      while (shouldRun) {
+        attemptBatch()
+        Thread.sleep(minBatchTime) // TODO: Could be tighter
+      }
+    }
+  }
+  microBatchThread.setDaemon(true)
+  microBatchThread.setUncaughtExceptionHandler(
+    new UncaughtExceptionHandler {
+      override def uncaughtException(t: Thread, e: Throwable): Unit = {
+        streamDeathCause = e
+      }
+    })
+  microBatchThread.start()
+
+  @volatile
+  private[sql] var lastExecution: QueryExecution = null
+  @volatile
+  private[sql] var streamDeathCause: Throwable = null
+
+  /**
+   * Checks to see if any new data is present in any of the sources.  When new 
data is available,
+   * a batch is executed and passed to the sink, updating the currentOffsets.
+   */
+  private def attemptBatch(): Unit = {
+    val startTime = System.nanoTime()
+
+    // A list of offsets that need to be updated if this batch is successful.
+    // Populated while walking the tree.
+    val newOffsets = new ArrayBuffer[(Source, Offset)]
+    // A list of attributes that will need to be updated.
+    var replacements = new ArrayBuffer[(Attribute, Attribute)]
+    // Replace sources in the logical plan with data that has arrived since 
the last batch.
+    val withNewSources = logicalPlan transform {
+      case StreamingRelation(source, output) =>
+        val prevOffset = streamProgress.get(source)
+        val newBatch = source.getNextBatch(prevOffset)
+
+        newBatch.map { batch =>
+          newOffsets += ((source, batch.end))
+          val newPlan = batch.data.logicalPlan
+
+          assert(output.size == newPlan.output.size)
+          replacements ++= output.zip(newPlan.output)
+          newPlan
+        }.getOrElse {
+          LocalRelation(output)
+        }
+    }
+
+    // Rewire the plan to use the new attributes that were returned by the 
source.
+    val replacementMap = AttributeMap(replacements)
+    val newPlan = withNewSources transformAllExpressions {
+      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+    }
+
+    if (newOffsets.nonEmpty) {
+      val optimizerStart = System.nanoTime()
+
+      lastExecution = new QueryExecution(sqlContext, newPlan)
+      val executedPlan = lastExecution.executedPlan
+      val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 
1000000
+      logDebug(s"Optimized batch in ${optimizerTime}ms")
+
+      streamProgress.synchronized {
+        // Update the offsets and calculate a new composite offset
+        newOffsets.foreach(streamProgress.update)
+        val newStreamProgress = logicalPlan.collect {
+          case StreamingRelation(source, _) => streamProgress.get(source)
+        }
+        val batchOffset = CompositeOffset(newStreamProgress)
+
+        // Construct the batch and send it to the sink.
+        val nextBatch = new Batch(batchOffset, new DataFrame(sqlContext, 
newPlan))
+        sink.addBatch(nextBatch)
+      }
+
+      batchRun = true
+      awaitBatchLock.synchronized {
+        // Wake up any threads that are waiting for the stream to progress.
+        awaitBatchLock.notifyAll()
+      }
+
+      val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
+      logInfo(s"Compete up to $newOffsets in ${batchTime}ms")
+    }
+
+    logDebug(s"Waiting for data, current: $streamProgress")
+  }
+
+  /**
+   * Signals to the thread executing micro-batches that it should stop running 
after the next
+   * batch. This method blocks until the thread stops running.
+   */
+  def stop(): Unit = {
+    shouldRun = false
+    if (microBatchThread.isAlive) { microBatchThread.join() }
+  }
+
+  /**
+   * Blocks the current thread until processing for data from the given 
`source` has reached at
+   * least the given `Offset`. This method is indented for use primarily when 
writing tests.
+   */
+  def awaitOffset(source: Source, newOffset: Offset): Unit = {
+    def notDone = streamProgress.synchronized {
+      !streamProgress.contains(source) || streamProgress(source) < newOffset
+    }
+
+    while (notDone) {
+      logInfo(s"Waiting until $newOffset at $source")
+      awaitBatchLock.synchronized { awaitBatchLock.wait(100) }
+    }
+    logDebug(s"Unblocked at $newOffset for $source")
+  }
+
+  override def toString: String =
+    s"""
+       |=== Streaming Query ===
+       |CurrentOffsets: $streamProgress
+       |Thread State: ${microBatchThread.getState}
+       |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) 
else ""}
+       |
+       |$logicalPlan
+     """.stripMargin
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
new file mode 100644
index 0000000..0ded1d7
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+/**
+ * A helper class that looks like a Map[Source, Offset].
+ */
+class StreamProgress {
+  private val currentOffsets = new mutable.HashMap[Source, Offset]
+
+  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
+    currentOffsets.get(source).foreach(old =>
+      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
+    currentOffsets.put(source, newOffset)
+  }
+
+  private[streaming] def update(newOffset: (Source, Offset)): Unit =
+    update(newOffset._1, newOffset._2)
+
+  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
+  private[streaming] def get(source: Source): Option[Offset] = 
currentOffsets.get(source)
+  private[streaming] def contains(source: Source): Boolean = 
currentOffsets.contains(source)
+
+  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
+    val updated = new StreamProgress
+    currentOffsets.foreach(updated.update)
+    updates.foreach(updated.update)
+    updated
+  }
+
+  /**
+   * Used to create a new copy of this [[StreamProgress]]. While this class is 
currently mutable,
+   * it should be copied before being passed to user code.
+   */
+  private[streaming] def copy(): StreamProgress = {
+    val copied = new StreamProgress
+    currentOffsets.foreach(copied.update)
+    copied
+  }
+
+  override def toString: String =
+    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
+
+  override def equals(other: Any): Boolean = other match {
+    case s: StreamProgress => currentOffsets == s.currentOffsets
+    case _ => false
+  }
+
+  override def hashCode: Int = currentOffsets.hashCode()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
new file mode 100644
index 0000000..e35c444
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object StreamingRelation {
+  def apply(source: Source): StreamingRelation =
+    StreamingRelation(source, source.schema.toAttributes)
+}
+
+/**
+ * Used to link a streaming [[Source]] of data into a
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
+ */
+case class StreamingRelation(source: Source, output: Seq[Attribute]) extends 
LeafNode {
+  override def toString: String = source.toString
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
new file mode 100644
index 0000000..e6a0842
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder}
+import org.apache.spark.sql.types.StructType
+
+object MemoryStream {
+  protected val currentBlockId = new AtomicInteger(0)
+  protected val memoryStreamId = new AtomicInteger(0)
+
+  def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] =
+    new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+}
+
+/**
+ * A [[Source]] that produces value stored in memory as they are added by the 
user.  This [[Source]]
+ * is primarily intended for use in unit tests as it can only replay data when 
the object is still
+ * available.
+ */
+case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+    extends Source with Logging {
+  protected val encoder = encoderFor[A]
+  protected val logicalPlan = StreamingRelation(this)
+  protected val output = logicalPlan.output
+  protected val batches = new ArrayBuffer[Dataset[A]]
+  protected var currentOffset: LongOffset = new LongOffset(-1)
+
+  protected def blockManager = SparkEnv.get.blockManager
+
+  def schema: StructType = encoder.schema
+
+  def getCurrentOffset: Offset = currentOffset
+
+  def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
+    new Dataset(sqlContext, logicalPlan)
+  }
+
+  def toDF()(implicit sqlContext: SQLContext): DataFrame = {
+    new DataFrame(sqlContext, logicalPlan)
+  }
+
+  def addData(data: TraversableOnce[A]): Offset = {
+    import sqlContext.implicits._
+    this.synchronized {
+      currentOffset = currentOffset + 1
+      val ds = data.toVector.toDS()
+      logDebug(s"Adding ds: $ds")
+      batches.append(ds)
+      currentOffset
+    }
+  }
+
+  override def getNextBatch(start: Option[Offset]): Option[Batch] = 
synchronized {
+    val newBlocks =
+      batches.drop(
+        
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 
1)
+
+    if (newBlocks.nonEmpty) {
+      logDebug(s"Running [$start, $currentOffset] on blocks 
${newBlocks.mkString(", ")}")
+      val df = newBlocks
+          .map(_.toDF())
+          .reduceOption(_ unionAll _)
+          .getOrElse(sqlContext.emptyDataFrame)
+
+      Some(new Batch(currentOffset, df))
+    } else {
+      None
+    }
+  }
+
+  override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+}
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
+ * tests and does not provide durablility.
+ */
+class MemorySink(schema: StructType) extends Sink with Logging {
+  /** An order list of batches that have been written to this [[Sink]]. */
+  private var batches = new ArrayBuffer[Batch]()
+
+  /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison 
in testing. */
+  private val externalRowConverter = RowEncoder(schema)
+
+  override def currentOffset: Option[Offset] = synchronized {
+    batches.lastOption.map(_.end)
+  }
+
+  override def addBatch(nextBatch: Batch): Unit = synchronized {
+    batches.append(nextBatch)
+  }
+
+  /** Returns all rows that are stored in this [[Sink]]. */
+  def allData: Seq[Row] = synchronized {
+    batches
+        .map(_.data)
+        .reduceOption(_ unionAll _)
+        .map(_.collect().toSeq)
+        .getOrElse(Seq.empty)
+  }
+
+  /**
+   * Atomically drops the most recent `num` batches and resets the 
[[StreamProgress]] to the
+   * corresponding point in the input. This function can be used when testing 
to simulate data
+   * that has been lost due to buffering.
+   */
+  def dropBatches(num: Int): Unit = synchronized {
+    batches.dropRight(num)
+  }
+
+  override def toString: String = synchronized {
+    batches.map(b => s"${b.end}: ${b.data.collect().mkString(" 
")}").mkString("\n")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 8911ad3..299fc6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
@@ -124,6 +125,26 @@ trait SchemaRelationProvider {
 }
 
 /**
+ * Implemented by objects that can produce a streaming [[Source]] for a 
specific format or system.
+ */
+trait StreamSourceProvider {
+  def createSource(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: Option[StructType]): Source
+}
+
+/**
+ * Implemented by objects that can produce a streaming [[Sink]] for a specific 
format or system.
+ */
+trait StreamSinkProvider {
+  def createSink(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String]): Sink
+}
+
+/**
  * ::Experimental::
  * Implemented by objects that produce relations for a specific kind of data 
source
  * with a given schema and partitioned columns.  When Spark SQL is given a DDL 
operation with a

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index ce12f78..405e589 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -304,27 +304,7 @@ object QueryTest {
   def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
     val isSorted = df.logicalPlan.collect { case s: logical.Sort => s 
}.nonEmpty
 
-    // We need to call prepareRow recursively to handle schemas with struct 
types.
-    def prepareRow(row: Row): Row = {
-      Row.fromSeq(row.toSeq.map {
-        case null => null
-        case d: java.math.BigDecimal => BigDecimal(d)
-        // Convert array to Seq for easy equality check.
-        case b: Array[_] => b.toSeq
-        case r: Row => prepareRow(r)
-        case o => o
-      })
-    }
 
-    def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
-      // Converts data to types that we can do equality comparison using Scala 
collections.
-      // For BigDecimal type, the Scala type has a better definition of 
equality test (similar to
-      // Java's java.math.BigDecimal.compareTo).
-      // For binary arrays, we convert it to Seq to avoid of calling 
java.util.Arrays.equals for
-      // equality test.
-      val converted: Seq[Row] = answer.map(prepareRow)
-      if (!isSorted) converted.sortBy(_.toString()) else converted
-    }
     val sparkAnswer = try df.collect().toSeq catch {
       case e: Exception =>
         val errorMessage =
@@ -338,22 +318,56 @@ object QueryTest {
         return Some(errorMessage)
     }
 
-    if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
-      val errorMessage =
+    sameRows(expectedAnswer, sparkAnswer, isSorted).map { results =>
         s"""
         |Results do not match for query:
         |${df.queryExecution}
         |== Results ==
-        |${sideBySide(
-          s"== Correct Answer - ${expectedAnswer.size} ==" +:
-            prepareAnswer(expectedAnswer).map(_.toString()),
-          s"== Spark Answer - ${sparkAnswer.size} ==" +:
-            prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
-      """.stripMargin
-      return Some(errorMessage)
+        |$results
+       """.stripMargin
     }
+  }
+
+
+  def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
+    // Converts data to types that we can do equality comparison using Scala 
collections.
+    // For BigDecimal type, the Scala type has a better definition of equality 
test (similar to
+    // Java's java.math.BigDecimal.compareTo).
+    // For binary arrays, we convert it to Seq to avoid of calling 
java.util.Arrays.equals for
+    // equality test.
+    val converted: Seq[Row] = answer.map(prepareRow)
+    if (!isSorted) converted.sortBy(_.toString()) else converted
+  }
 
-    return None
+  // We need to call prepareRow recursively to handle schemas with struct 
types.
+  def prepareRow(row: Row): Row = {
+    Row.fromSeq(row.toSeq.map {
+      case null => null
+      case d: java.math.BigDecimal => BigDecimal(d)
+      // Convert array to Seq for easy equality check.
+      case b: Array[_] => b.toSeq
+      case r: Row => prepareRow(r)
+      case o => o
+    })
+  }
+
+  def sameRows(
+      expectedAnswer: Seq[Row],
+      sparkAnswer: Seq[Row],
+      isSorted: Boolean = false): Option[String] = {
+    if (prepareAnswer(expectedAnswer, isSorted) != prepareAnswer(sparkAnswer, 
isSorted)) {
+      val errorMessage =
+        s"""
+         |== Results ==
+         |${sideBySide(
+        s"== Correct Answer - ${expectedAnswer.size} ==" +:
+         prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
+        s"== Spark Answer - ${sparkAnswer.size} ==" +:
+         prepareAnswer(sparkAnswer, 
isSorted).map(_.toString())).mkString("\n")}
+        """.stripMargin
+      return Some(errorMessage)
+    }
+    None
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
new file mode 100644
index 0000000..f45abbf
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.streaming._
+
+/**
+ * A framework for implementing tests for streaming queries and sources.
+ *
+ * A test consists of a set of steps (expressed as a `StreamAction`) that are 
executed in order,
+ * blocking as necessary to let the stream catch up.  For example, the 
following adds some data to
+ * a stream, blocking until it can verify that the correct values are 
eventually produced.
+ *
+ * {{{
+ *  val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map(_ + 1)
+
+    testStream(mapped)(
+      AddData(inputData, 1, 2, 3),
+      CheckAnswer(2, 3, 4))
+ * }}}
+ *
+ * Note that while we do sleep to allow the other thread to progress without 
spinning,
+ * `StreamAction` checks should not depend on the amount of time spent 
sleeping.  Instead they
+ * should check the actual progress of the stream before verifying the 
required test condition.
+ *
+ * Currently it is assumed that all streaming queries will eventually complete 
in 10 seconds to
+ * avoid hanging forever in the case of failures. However, individual suites 
can change this
+ * by overriding `streamingTimeout`.
+ */
+trait StreamTest extends QueryTest with Timeouts {
+
+  implicit class RichSource(s: Source) {
+    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
+  }
+
+  /** How long to wait for an active stream to catch up when checking a 
result. */
+  val streamingTimout = 10.seconds
+
+  /** A trait for actions that can be performed while testing a streaming 
DataFrame. */
+  trait StreamAction
+
+  /** A trait to mark actions that require the stream to be actively running. 
*/
+  trait StreamMustBeRunning
+
+  /**
+   * Adds the given data to the stream.  Subsuquent check answers will block 
until this data has
+   * been processed.
+   */
+  object AddData {
+    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
+      AddDataMemory(source, data)
+  }
+
+  /** A trait that can be extended when testing other sources. */
+  trait AddData extends StreamAction {
+    def source: Source
+
+    /**
+     * Called to trigger adding the data.  Should return the offset that will 
denote when this
+     * new data has been processed.
+     */
+    def addData(): Offset
+  }
+
+  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends 
AddData {
+    override def toString: String = s"AddData to $source: 
${data.mkString(",")}"
+
+    override def addData(): Offset = {
+      source.addData(data)
+    }
+  }
+
+  /**
+   * Checks to make sure that the current data stored in the sink matches the 
`expectedAnswer`.
+   * This operation automatically blocks untill all added data has been 
processed.
+   */
+  object CheckAnswer {
+    def apply[A : Encoder](data: A*): CheckAnswerRows = {
+      val encoder = encoderFor[A]
+      val toExternalRow = RowEncoder(encoder.schema)
+      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
+    }
+
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
+  }
+
+  case class CheckAnswerRows(expectedAnswer: Seq[Row])
+      extends StreamAction with StreamMustBeRunning {
+    override def toString: String = s"CheckAnswer: 
${expectedAnswer.mkString(",")}"
+  }
+
+  case class DropBatches(num: Int) extends StreamAction
+
+  /** Stops the stream.  It must currently be running. */
+  case object StopStream extends StreamAction with StreamMustBeRunning
+
+  /** Starts the stream, resuming if data has already been processed.  It must 
not be running. */
+  case object StartStream extends StreamAction
+
+  /** Signals that a failure is expected and should not kill the test. */
+  case object ExpectFailure extends StreamAction
+
+  /** A helper for running actions on a Streaming Dataset. See 
`checkAnswer(DataFrame)`. */
+  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
+    testStream(stream.toDF())(actions: _*)
+
+  /**
+   * Executes the specified actions on the the given streaming DataFrame and 
provides helpful
+   * error messages in the case of failures or incorrect answers.
+   *
+   * Note that if the stream is not explictly started before an action that 
requires it to be
+   * running then it will be automatically started before performing any other 
actions.
+   */
+  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
+    var pos = 0
+    var currentPlan: LogicalPlan = stream.logicalPlan
+    var currentStream: StreamExecution = null
+    val awaiting = new mutable.HashMap[Source, Offset]()
+    val sink = new MemorySink(stream.schema)
+
+    @volatile
+    var streamDeathCause: Throwable = null
+
+    // If the test doesn't manually start the stream, we do it automatically 
at the beginning.
+    val startedManually =
+      
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
+    val startedTest = if (startedManually) actions else StartStream +: actions
+
+    def testActions = actions.zipWithIndex.map {
+      case (a, i) =>
+        if ((pos == i && startedManually) || (pos == (i + 1) && 
!startedManually)) {
+          "=> " + a.toString
+        } else {
+          "   " + a.toString
+        }
+    }.mkString("\n")
+
+    def currentOffsets =
+      if (currentStream != null) currentStream.streamProgress.toString else 
"not started"
+
+    def threadState =
+      if (currentStream != null && currentStream.microBatchThread.isAlive) 
"alive" else "dead"
+    def testState =
+      s"""
+         |== Progress ==
+         |$testActions
+         |
+         |== Stream ==
+         |Stream state: $currentOffsets
+         |Thread state: $threadState
+         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) 
else ""}
+         |
+         |== Sink ==
+         |$sink
+         |
+         |== Plan ==
+         |${if (currentStream != null) currentStream.lastExecution else ""}
+         """
+
+    def checkState(check: Boolean, error: String) = if (!check) {
+      fail(
+        s"""
+           |Invalid State: $error
+           |$testState
+         """.stripMargin)
+    }
+
+    val testThread = Thread.currentThread()
+
+    try {
+      startedTest.foreach { action =>
+        action match {
+          case StartStream =>
+            checkState(currentStream == null, "stream already running")
+
+            currentStream = new StreamExecution(sqlContext, 
stream.logicalPlan, sink)
+            currentStream.microBatchThread.setUncaughtExceptionHandler(
+              new UncaughtExceptionHandler {
+                override def uncaughtException(t: Thread, e: Throwable): Unit 
= {
+                  streamDeathCause = e
+                  testThread.interrupt()
+                }
+              })
+
+          case StopStream =>
+            checkState(currentStream != null, "can not stop a stream that is 
not running")
+            currentStream.stop()
+            currentStream = null
+
+          case DropBatches(num) =>
+            checkState(currentStream == null, "dropping batches while running 
leads to corruption")
+            sink.dropBatches(num)
+
+          case ExpectFailure =>
+            try failAfter(streamingTimout) {
+              while (streamDeathCause == null) {
+                Thread.sleep(100)
+              }
+            } catch {
+              case _: InterruptedException =>
+              case _: org.scalatest.exceptions.TestFailedDueToTimeoutException 
=>
+                fail(
+                  s"""
+                     |Timed out while waiting for failure.
+                     |$testState
+                   """.stripMargin)
+            }
+
+            currentStream = null
+            streamDeathCause = null
+
+          case a: AddData =>
+            awaiting.put(a.source, a.addData())
+
+          case CheckAnswerRows(expectedAnswer) =>
+            checkState(currentStream != null, "stream not running")
+
+            // Block until all data added has been processed
+            awaiting.foreach { case (source, offset) =>
+              failAfter(streamingTimout) {
+                currentStream.awaitOffset(source, offset)
+              }
+            }
+
+            val allData = try sink.allData catch {
+              case e: Exception =>
+                fail(
+                  s"""
+                    |Exception while getting data from sink $e
+                    |$testState
+                  """.stripMargin)
+            }
+
+            QueryTest.sameRows(expectedAnswer, allData).foreach {
+              error => fail(
+                s"""
+                   |$error
+                   |$testState
+                 """.stripMargin)
+            }
+        }
+        pos += 1
+      }
+    } catch {
+      case _: InterruptedException if streamDeathCause != null =>
+        fail(
+          s"""
+             |Stream Thread Died
+             |$testState
+                      """.stripMargin)
+      case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+        fail(
+          s"""
+             |Timed out waiting for stream
+             |$testState
+                   """.stripMargin)
+    } finally {
+      if (currentStream != null && currentStream.microBatchThread.isAlive) {
+        currentStream.stop()
+      }
+    }
+  }
+
+  /**
+   * Creates a stress test that randomly starts/stops/adds data/checks the 
result.
+   *
+   * @param ds a dataframe that executes + 1 on a stream of integers, 
returning the result.
+   * @param addData and add data action that adds the given numbers to the 
stream, encoding them
+   *                as needed
+   */
+  def runStressTest(
+      ds: Dataset[Int],
+      addData: Seq[Int] => StreamAction,
+      iterations: Int = 100): Unit = {
+    implicit val intEncoder = ExpressionEncoder[Int]()
+    var dataPos = 0
+    var running = true
+    val actions = new ArrayBuffer[StreamAction]()
+
+    def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
+
+    def addRandomData() = {
+      val numItems = Random.nextInt(10)
+      val data = dataPos until (dataPos + numItems)
+      dataPos += numItems
+      actions += addData(data)
+    }
+
+    (1 to iterations).foreach { i =>
+      val rand = Random.nextDouble()
+      if(!running) {
+        rand match {
+          case r if r < 0.7 => // AddData
+            addRandomData()
+
+          case _ => // StartStream
+            actions += StartStream
+            running = true
+        }
+      } else {
+        rand match {
+          case r if r < 0.1 =>
+            addCheck()
+
+          case r if r < 0.7 => // AddData
+            addRandomData()
+
+          case _ => // StartStream
+            actions += StopStream
+            running = false
+        }
+      }
+    }
+    if(!running) { actions += StartStream }
+    addCheck()
+    testStream(ds)(actions: _*)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
new file mode 100644
index 0000000..1dab6eb
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import org.apache.spark.sql.{AnalysisException, SQLContext, StreamTest}
+import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+object LastOptions {
+  var parameters: Map[String, String] = null
+  var schema: Option[StructType] = null
+  var partitionColumns: Seq[String] = Nil
+}
+
+/** Dummy provider: returns no-op source/sink and records options in 
[[LastOptions]]. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+  override def createSource(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: Option[StructType]): Source = {
+    LastOptions.parameters = parameters
+    LastOptions.schema = schema
+    new Source {
+      override def getNextBatch(start: Option[Offset]): Option[Batch] = None
+      override def schema: StructType = StructType(StructField("a", 
IntegerType) :: Nil)
+    }
+  }
+
+  override def createSink(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String]): Sink = {
+    LastOptions.parameters = parameters
+    LastOptions.partitionColumns = partitionColumns
+    new Sink {
+      override def addBatch(batch: Batch): Unit = {}
+      override def currentOffset: Option[Offset] = None
+    }
+  }
+}
+
+class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  test("resolve default source") {
+    sqlContext.streamFrom
+      .format("org.apache.spark.sql.streaming.test")
+      .open()
+      .streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .start()
+      .stop()
+  }
+
+  test("resolve full class") {
+    sqlContext.streamFrom
+      .format("org.apache.spark.sql.streaming.test.DefaultSource")
+      .open()
+      .streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .start()
+      .stop()
+  }
+
+  test("options") {
+    val map = new java.util.HashMap[String, String]
+    map.put("opt3", "3")
+
+    val df = sqlContext.streamFrom
+        .format("org.apache.spark.sql.streaming.test")
+        .option("opt1", "1")
+        .options(Map("opt2" -> "2"))
+        .options(map)
+        .open()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+
+    LastOptions.parameters = null
+
+    df.streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .option("opt1", "1")
+      .options(Map("opt2" -> "2"))
+      .options(map)
+      .start()
+      .stop()
+
+    assert(LastOptions.parameters("opt1") == "1")
+    assert(LastOptions.parameters("opt2") == "2")
+    assert(LastOptions.parameters("opt3") == "3")
+  }
+
+  test("partitioning") {
+    val df = sqlContext.streamFrom
+      .format("org.apache.spark.sql.streaming.test")
+      .open()
+
+    df.streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .start()
+      .stop()
+    assert(LastOptions.partitionColumns == Nil)
+
+    df.streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .partitionBy("a")
+      .start()
+      .stop()
+    assert(LastOptions.partitionColumns == Seq("a"))
+
+
+    withSQLConf("spark.sql.caseSensitive" -> "false") {
+      df.streamTo
+        .format("org.apache.spark.sql.streaming.test")
+        .partitionBy("A")
+        .start()
+        .stop()
+      assert(LastOptions.partitionColumns == Seq("a"))
+    }
+
+    intercept[AnalysisException] {
+      df.streamTo
+        .format("org.apache.spark.sql.streaming.test")
+        .partitionBy("b")
+        .start()
+        .stop()
+    }
+  }
+
+  test("stream paths") {
+    val df = sqlContext.streamFrom
+      .format("org.apache.spark.sql.streaming.test")
+      .open("/test")
+
+    assert(LastOptions.parameters("path") == "/test")
+
+    LastOptions.parameters = null
+
+    df.streamTo
+      .format("org.apache.spark.sql.streaming.test")
+      .start("/test")
+      .stop()
+
+    assert(LastOptions.parameters("path") == "/test")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
new file mode 100644
index 0000000..81760d2
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class MemorySourceStressSuite extends StreamTest with SharedSQLContext {
+  import testImplicits._
+
+  test("memory stress test") {
+    val input = MemoryStream[Int]
+    val mapped = input.toDS().map(_ + 1)
+
+    runStressTest(mapped, AddData(input, _: _*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
new file mode 100644
index 0000000..9894658
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, 
Offset}
+
+trait OffsetSuite extends SparkFunSuite {
+  /** Creates test to check all the comparisons of offsets given a `one` that 
is less than `two`. */
+  def compare(one: Offset, two: Offset): Unit = {
+    test(s"comparision $one <=> $two") {
+      assert(one < two)
+      assert(one <= two)
+      assert(one <= one)
+      assert(two > one)
+      assert(two >= one)
+      assert(one >= one)
+      assert(one == one)
+      assert(two == two)
+      assert(one != two)
+      assert(two != one)
+    }
+  }
+
+  /** Creates test to check that non-equality comparisons throw exception. */
+  def compareInvalid(one: Offset, two: Offset): Unit = {
+    test(s"invalid comparison $one <=> $two") {
+      intercept[IllegalArgumentException] {
+        assert(one < two)
+      }
+
+      intercept[IllegalArgumentException] {
+        assert(one <= two)
+      }
+
+      intercept[IllegalArgumentException] {
+        assert(one > two)
+      }
+
+      intercept[IllegalArgumentException] {
+        assert(one >= two)
+      }
+
+      assert(!(one == two))
+      assert(!(two == one))
+      assert(one != two)
+      assert(two != one)
+    }
+  }
+}
+
+class LongOffsetSuite extends OffsetSuite {
+  val one = LongOffset(1)
+  val two = LongOffset(2)
+  compare(one, two)
+}
+
+class CompositeOffsetSuite extends OffsetSuite {
+  compare(
+    one = CompositeOffset(Some(LongOffset(1)) :: Nil),
+    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+  compare(
+    one = CompositeOffset(None :: Nil),
+    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+  compareInvalid(                                               // sizes must 
be same
+    one = CompositeOffset(Nil),
+    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+  compare(
+    one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
+    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+
+  compare(
+    one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
+    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+
+  compareInvalid(
+    one = CompositeOffset.fill(LongOffset(2), LongOffset(1)),   // vector time 
inconsistent
+    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
new file mode 100644
index 0000000..fbb1792
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Row, StreamTest}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class StreamSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  test("map with recovery") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map(_ + 1)
+
+    testStream(mapped)(
+      AddData(inputData, 1, 2, 3),
+      StartStream,
+      CheckAnswer(2, 3, 4),
+      StopStream,
+      AddData(inputData, 4, 5, 6),
+      StartStream,
+      CheckAnswer(2, 3, 4, 5, 6, 7))
+  }
+
+  test("join") {
+    // Make a table and ensure it will be broadcast.
+    val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", 
"word")
+
+    // Join the input stream with a table.
+    val inputData = MemoryStream[Int]
+    val joined = inputData.toDS().toDF().join(smallTable, $"value" === 
$"number")
+
+    testStream(joined)(
+      AddData(inputData, 1, 2, 3),
+      CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two")),
+      AddData(inputData, 4),
+      CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
+  }
+
+  test("union two streams") {
+    val inputData1 = MemoryStream[Int]
+    val inputData2 = MemoryStream[Int]
+
+    val unioned = inputData1.toDS().union(inputData2.toDS())
+
+    testStream(unioned)(
+      AddData(inputData1, 1, 3, 5),
+      CheckAnswer(1, 3, 5),
+      AddData(inputData2, 2, 4, 6),
+      CheckAnswer(1, 2, 3, 4, 5, 6),
+      StopStream,
+      AddData(inputData1, 7),
+      StartStream,
+      AddData(inputData2, 8),
+      CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
+  }
+
+  test("sql queries") {
+    val inputData = MemoryStream[Int]
+    inputData.toDF().registerTempTable("stream")
+    val evens = sql("SELECT * FROM stream WHERE value % 2 = 0")
+
+    testStream(evens)(
+      AddData(inputData, 1, 2, 3, 4),
+      CheckAnswer(2, 4))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12a20c14/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e7b3765..c341191 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -36,7 +36,7 @@ trait SharedSQLContext extends SQLTestUtils {
   /**
    * The [[TestSQLContext]] to use for all tests in this suite.
    */
-  protected def sqlContext: SQLContext = _ctx
+  protected implicit def sqlContext: SQLContext = _ctx
 
   /**
    * Initialize the [[TestSQLContext]].


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to