Repository: spark Updated Branches: refs/heads/master 5d50d4f0f -> 214adb14b
http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala new file mode 100644 index 0000000..b035ff7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.writeStream]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + * written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { + this.outputMode = outputMode + this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { + this.outputMode = outputMode.toLowerCase match { + case "append" => + OutputMode.Append + case "complete" => + OutputMode.Complete + case _ => + throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") + } + this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.writeStream.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.writeStream().trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { + this.trigger = trigger + this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { + this.extraOptions += ("queryName" -> queryName) + this + } + + /** + * :: Experimental :: + * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. + * + * @since 2.0.0 + */ + @Experimental + def format(source: String): DataStreamWriter[T] = { + this.source = source + this + } + + /** + * Partitions the output by the given columns on the file system. If specified, the output is + * laid out on the file system similar to Hive's partitioning scheme. As an example, when we + * partition a dataset by year and then month, the directory layout would look like: + * + * - year=2016/month=01/ + * - year=2016/month=02/ + * + * Partitioning is one of the most widely used techniques to optimize physical data layout. + * It provides a coarse-grained index for skipping unnecessary data reads when queries have + * predicates on the partitioned columns. In order for partitioning to work well, the number + * of distinct values in each column should typically be less than tens of thousands. + * + * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well. + * + * @since 1.4.0 + */ + @scala.annotation.varargs + def partitionBy(colNames: String*): DataStreamWriter[T] = { + this.partitioningColumns = Option(colNames) + this + } + + /** + * :: Experimental :: + * Adds an output option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: String): DataStreamWriter[T] = { + this.extraOptions += (key -> value) + this + } + + /** + * :: Experimental :: + * Adds an output option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an output option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an output option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString) + + /** + * :: Experimental :: + * (Scala-specific) Adds output options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = { + this.extraOptions ++= options + this + } + + /** + * :: Experimental :: + * Adds output options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: java.util.Map[String, String]): DataStreamWriter[T] = { + this.options(options.asScala) + this + } + + /** + * :: Experimental :: + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * the stream. + * + * @since 2.0.0 + */ + @Experimental + def start(path: String): ContinuousQuery = { + option("path", path).start() + } + + /** + * :: Experimental :: + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with + * the stream. + * + * @since 2.0.0 + */ + @Experimental + def start(): ContinuousQuery = { + if (source == "memory") { + assertNotPartitioned("memory") + if (extraOptions.get("queryName").isEmpty) { + throw new AnalysisException("queryName must be specified for memory sink") + } + + val sink = new MemorySink(df.schema, outputMode) + val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) + val query = df.sparkSession.sessionState.continuousQueryManager.startQuery( + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), + df, + sink, + outputMode, + useTempCheckpointLocation = true, + recoverFromCheckpointLocation = false, + trigger = trigger) + resultDf.createOrReplaceTempView(query.name) + query + } else if (source == "foreach") { + assertNotPartitioned("foreach") + val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) + df.sparkSession.sessionState.continuousQueryManager.startQuery( + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), + df, + sink, + outputMode, + useTempCheckpointLocation = true, + trigger = trigger) + } else { + val dataSource = + DataSource( + df.sparkSession, + className = source, + options = extraOptions.toMap, + partitionColumns = normalizedParCols.getOrElse(Nil)) + df.sparkSession.sessionState.continuousQueryManager.startQuery( + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), + df, + dataSource.createSink(outputMode), + outputMode, + trigger = trigger) + } + } + + /** + * :: Experimental :: + * Starts the execution of the streaming query, which will continually send results to the given + * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data + * generated by the [[DataFrame]]/[[Dataset]] to an external system. + * + * Scala example: + * {{{ + * datasetOfString.writeStream.foreach(new ForeachWriter[String] { + * + * def open(partitionId: Long, version: Long): Boolean = { + * // open connection + * } + * + * def process(record: String) = { + * // write string to connection + * } + * + * def close(errorOrNull: Throwable): Unit = { + * // close the connection + * } + * }).start() + * }}} + * + * Java example: + * {{{ + * datasetOfString.writeStream().foreach(new ForeachWriter<String>() { + * + * @Override + * public boolean open(long partitionId, long version) { + * // open connection + * } + * + * @Override + * public void process(String value) { + * // write string to connection + * } + * + * @Override + * public void close(Throwable errorOrNull) { + * // close the connection + * } + * }).start(); + * }}} + * + * @since 2.0.0 + */ + @Experimental + def foreach(writer: ForeachWriter[T]): DataStreamWriter[T] = { + this.source = "foreach" + this.foreachWriter = if (writer != null) { + ds.sparkSession.sparkContext.clean(writer) + } else { + throw new IllegalArgumentException("foreach writer cannot be null") + } + this + } + + private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => + cols.map(normalize(_, "Partition")) + } + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(columnName: String, columnType: String): String = { + val validColumnNames = df.logicalPlan.output.map(_.name) + validColumnNames.find(df.sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + + s"existing columns (${validColumnNames.mkString(", ")})")) + } + + private def assertNotPartitioned(operation: String): Unit = { + if (partitioningColumns.isDefined) { + throw new AnalysisException(s"'$operation' does not support partitioning") + } + } + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + + private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName + + private var outputMode: OutputMode = OutputMode.Append + + private var trigger: Trigger = ProcessingTime(0L) + + private var extraOptions = new scala.collection.mutable.HashMap[String, String] + + private var foreachWriter: ForeachWriter[T] = null + + private var partitioningColumns: Option[Seq[String]] = None +} http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index e1fb3b9..6ff597c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -38,9 +38,10 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf test("foreach") { withTempDir { checkpointDir => val input = MemoryStream[Int] - val query = input.toDS().repartition(2).write + val query = input.toDS().repartition(2).writeStream .option("checkpointLocation", checkpointDir.getCanonicalPath) .foreach(new TestForeachWriter()) + .start() input.addData(1, 2, 3, 4) query.processAllAvailable() @@ -70,14 +71,14 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf test("foreach with error") { withTempDir { checkpointDir => val input = MemoryStream[Int] - val query = input.toDS().repartition(1).write + val query = input.toDS().repartition(1).writeStream .option("checkpointLocation", checkpointDir.getCanonicalPath) .foreach(new TestForeachWriter() { override def process(value: Int): Unit = { super.process(value) throw new RuntimeException("error") } - }) + }).start() input.addData(1, 2, 3, 4) query.processAllAvailable() http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index f81608b..ef2fcbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -225,12 +225,12 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath query = - df.write + df.writeStream .format("memory") .queryName(s"query$i") .option("checkpointLocation", metadataRoot) .outputMode("append") - .startStream() + .start() .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 43a8857..ad6bc27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -39,12 +39,12 @@ class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { def startQuery(queryName: String): ContinuousQuery = { val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath - val writer = mapped.write + val writer = mapped.writeStream writer .queryName(queryName) .format("memory") .option("checkpointLocation", metadataRoot) - .startStream() + .start() } val q1 = startQuery("q1") http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index bb3063d..a5acc97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -128,10 +128,10 @@ class FileStreamSinkSuite extends StreamTest { try { query = - df.write - .format("parquet") + df.writeStream .option("checkpointLocation", checkpointDir) - .startStream(outputDir) + .format("parquet") + .start(outputDir) inputData.addData(1, 2, 3) @@ -162,11 +162,11 @@ class FileStreamSinkSuite extends StreamTest { query = ds.map(i => (i, i * 1000)) .toDF("id", "value") - .write - .format("parquet") + .writeStream .partitionBy("id") .option("checkpointLocation", checkpointDir) - .startStream(outputDir) + .format("parquet") + .start(outputDir) inputData.addData(1, 2, 3) failAfter(streamingTimeout) { @@ -246,13 +246,13 @@ class FileStreamSinkSuite extends StreamTest { val writer = ds.map(i => (i, i * 1000)) .toDF("id", "value") - .write + .writeStream if (format.nonEmpty) { writer.format(format.get) } query = writer .option("checkpointLocation", checkpointDir) - .startStream(outputDir) + .start(outputDir) } finally { if (query != null) { query.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f681b88..6971f93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -107,11 +107,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { schema: Option[StructType] = None): DataFrame = { val reader = if (schema.isDefined) { - spark.read.format(format).schema(schema.get) + spark.readStream.format(format).schema(schema.get) } else { - spark.read.format(format) + spark.readStream.format(format) } - reader.stream(path) + reader.load(path) } protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = { @@ -153,14 +153,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest { format: Option[String], path: Option[String], schema: Option[StructType] = None): StructType = { - val reader = spark.read + val reader = spark.readStream format.foreach(reader.format) schema.foreach(reader.schema) val df = if (path.isDefined) { - reader.stream(path.get) + reader.load(path.get) } else { - reader.stream() + reader.load() } df.queryExecution.analyzed .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 1c0fb34..0e157cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -98,7 +98,7 @@ class FileStressSuite extends StreamTest { } writer.start() - val input = spark.read.format("text").stream(inputDir) + val input = spark.readStream.format("text").load(inputDir) def startStream(): ContinuousQuery = { val output = input @@ -116,17 +116,17 @@ class FileStressSuite extends StreamTest { if (partitionWrites) { output - .write + .writeStream .partitionBy("id") .format("parquet") .option("checkpointLocation", checkpoint) - .startStream(outputDir) + .start(outputDir) } else { output - .write + .writeStream .format("parquet") .option("checkpointLocation", checkpoint) - .startStream(outputDir) + .start(outputDir) } } http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 9aada0b..310d756 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -140,11 +140,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("registering as a table in Append output mode") { val input = MemoryStream[Int] - val query = input.toDF().write + val query = input.toDF().writeStream .format("memory") .outputMode("append") .queryName("memStream") - .startStream() + .start() input.addData(1, 2, 3) query.processAllAvailable() @@ -166,11 +166,11 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { val query = input.toDF() .groupBy("value") .count() - .write + .writeStream .format("memory") .outputMode("complete") .queryName("memStream") - .startStream() + .start() input.addData(1, 2, 3) query.processAllAvailable() @@ -191,10 +191,10 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { // Ignore the stress test as it takes several minutes to run (0 until 1000).foreach { _ => val input = MemoryStream[Int] - val query = input.toDF().write + val query = input.toDF().writeStream .format("memory") .queryName("memStream") - .startStream() + .start() input.addData(1, 2, 3) query.processAllAvailable() @@ -215,9 +215,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("error when no name is specified") { val error = intercept[AnalysisException] { val input = MemoryStream[Int] - val query = input.toDF().write + val query = input.toDF().writeStream .format("memory") - .startStream() + .start() } assert(error.message contains "queryName must be specified") @@ -227,21 +227,21 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath val input = MemoryStream[Int] - val query = input.toDF().write + val query = input.toDF().writeStream .format("memory") .queryName("memStream") .option("checkpointLocation", location) - .startStream() + .start() input.addData(1, 2, 3) query.processAllAvailable() query.stop() intercept[AnalysisException] { - input.toDF().write + input.toDF().writeStream .format("memory") .queryName("memStream") .option("checkpointLocation", location) - .startStream() + .start() } } http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 9414b1c..786404a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -89,9 +89,9 @@ class StreamSuite extends StreamTest { def assertDF(df: DataFrame) { withTempDir { outputDir => withTempDir { checkpointDir => - val query = df.write.format("parquet") + val query = df.writeStream.format("parquet") .option("checkpointLocation", checkpointDir.getAbsolutePath) - .startStream(outputDir.getAbsolutePath) + .start(outputDir.getAbsolutePath) try { query.processAllAvailable() val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] @@ -103,7 +103,7 @@ class StreamSuite extends StreamTest { } } - val df = spark.read.format(classOf[FakeDefaultSource].getName).stream() + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() assertDF(df) assertDF(df) } http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 8681199..7f44227 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { import testImplicits._ + + test("simple count, update mode") { val inputData = MemoryStream[Int] http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala deleted file mode 100644 index 6e0d66a..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming.test - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration._ - -import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -import org.apache.spark.util.Utils - -object LastOptions { - - var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) - var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) - var parameters: Map[String, String] = null - var schema: Option[StructType] = null - var partitionColumns: Seq[String] = Nil - - def clear(): Unit = { - parameters = null - schema = null - partitionColumns = null - reset(mockStreamSourceProvider) - reset(mockStreamSinkProvider) - } -} - -/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */ -class DefaultSource extends StreamSourceProvider with StreamSinkProvider { - - private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) - - override def sourceSchema( - spark: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): (String, StructType) = { - LastOptions.parameters = parameters - LastOptions.schema = schema - LastOptions.mockStreamSourceProvider.sourceSchema(spark, schema, providerName, parameters) - ("dummySource", fakeSchema) - } - - override def createSource( - spark: SQLContext, - metadataPath: String, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): Source = { - LastOptions.parameters = parameters - LastOptions.schema = schema - LastOptions.mockStreamSourceProvider.createSource( - spark, metadataPath, schema, providerName, parameters) - new Source { - override def schema: StructType = fakeSchema - - override def getOffset: Option[Offset] = Some(new LongOffset(0)) - - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - import spark.implicits._ - - Seq[Int]().toDS().toDF() - } - } - } - - override def createSink( - spark: SQLContext, - parameters: Map[String, String], - partitionColumns: Seq[String], - outputMode: OutputMode): Sink = { - LastOptions.parameters = parameters - LastOptions.partitionColumns = partitionColumns - LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) - new Sink { - override def addBatch(batchId: Long, data: DataFrame): Unit = {} - } - } -} - -class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { - - private def newMetadataDir = - Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath - - after { - spark.streams.active.foreach(_.stop()) - } - - test("resolve default source") { - spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - .write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .startStream() - .stop() - } - - test("resolve full class") { - spark.read - .format("org.apache.spark.sql.streaming.test.DefaultSource") - .stream() - .write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .startStream() - .stop() - } - - test("options") { - val map = new java.util.HashMap[String, String] - map.put("opt3", "3") - - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .option("opt1", "1") - .options(Map("opt2" -> "2")) - .options(map) - .stream() - - assert(LastOptions.parameters("opt1") == "1") - assert(LastOptions.parameters("opt2") == "2") - assert(LastOptions.parameters("opt3") == "3") - - LastOptions.clear() - - df.write - .format("org.apache.spark.sql.streaming.test") - .option("opt1", "1") - .options(Map("opt2" -> "2")) - .options(map) - .option("checkpointLocation", newMetadataDir) - .startStream() - .stop() - - assert(LastOptions.parameters("opt1") == "1") - assert(LastOptions.parameters("opt2") == "2") - assert(LastOptions.parameters("opt3") == "3") - } - - test("partitioning") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - - df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .startStream() - .stop() - assert(LastOptions.partitionColumns == Nil) - - df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .partitionBy("a") - .startStream() - .stop() - assert(LastOptions.partitionColumns == Seq("a")) - - withSQLConf("spark.sql.caseSensitive" -> "false") { - df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .partitionBy("A") - .startStream() - .stop() - assert(LastOptions.partitionColumns == Seq("a")) - } - - intercept[AnalysisException] { - df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .partitionBy("b") - .startStream() - .stop() - } - } - - test("stream paths") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .stream("/test") - - assert(LastOptions.parameters("path") == "/test") - - LastOptions.clear() - - df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .startStream("/test") - .stop() - - assert(LastOptions.parameters("path") == "/test") - } - - test("test different data types for options") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .option("intOpt", 56) - .option("boolOpt", false) - .option("doubleOpt", 6.7) - .stream("/test") - - assert(LastOptions.parameters("intOpt") == "56") - assert(LastOptions.parameters("boolOpt") == "false") - assert(LastOptions.parameters("doubleOpt") == "6.7") - - LastOptions.clear() - df.write - .format("org.apache.spark.sql.streaming.test") - .option("intOpt", 56) - .option("boolOpt", false) - .option("doubleOpt", 6.7) - .option("checkpointLocation", newMetadataDir) - .startStream("/test") - .stop() - - assert(LastOptions.parameters("intOpt") == "56") - assert(LastOptions.parameters("boolOpt") == "false") - assert(LastOptions.parameters("doubleOpt") == "6.7") - } - - test("unique query names") { - - /** Start a query with a specific name */ - def startQueryWithName(name: String = ""): ContinuousQuery = { - spark.read - .format("org.apache.spark.sql.streaming.test") - .stream("/test") - .write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .queryName(name) - .startStream() - } - - /** Start a query without specifying a name */ - def startQueryWithoutName(): ContinuousQuery = { - spark.read - .format("org.apache.spark.sql.streaming.test") - .stream("/test") - .write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .startStream() - } - - /** Get the names of active streams */ - def activeStreamNames: Set[String] = { - val streams = spark.streams.active - val names = streams.map(_.name).toSet - assert(streams.length === names.size, s"names of active queries are not unique: $names") - names - } - - val q1 = startQueryWithName("name") - - // Should not be able to start another query with the same name - intercept[IllegalArgumentException] { - startQueryWithName("name") - } - assert(activeStreamNames === Set("name")) - - // Should be able to start queries with other names - val q3 = startQueryWithName("another-name") - assert(activeStreamNames === Set("name", "another-name")) - - // Should be able to start queries with auto-generated names - val q4 = startQueryWithoutName() - assert(activeStreamNames.contains(q4.name)) - - // Should not be able to start a query with same auto-generated name - intercept[IllegalArgumentException] { - startQueryWithName(q4.name) - } - - // Should be able to start query with that name after stopping the previous query - q1.stop() - val q5 = startQueryWithName("name") - assert(activeStreamNames.contains("name")) - spark.streams.active.foreach(_.stop()) - } - - test("trigger") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream("/test") - - var q = df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime(10.seconds)) - .startStream() - q.stop() - - assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000)) - - q = df.write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) - .startStream() - q.stop() - - assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000)) - } - - test("source metadataPath") { - LastOptions.clear() - - val checkpointLocation = newMetadataDir - - val df1 = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - - val df2 = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - - val q = df1.union(df2).write - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", checkpointLocation) - .trigger(ProcessingTime(10.seconds)) - .startStream() - q.stop() - - verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - checkpointLocation + "/sources/0", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) - - verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - checkpointLocation + "/sources/1", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) - } - - private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath - - test("check trigger() can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds"))) - assert(e.getMessage == "trigger() can only be called on continuous queries;") - } - - test("check queryName() can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.queryName("queryName")) - assert(e.getMessage == "queryName() can only be called on continuous queries;") - } - - test("check startStream() can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.startStream()) - assert(e.getMessage == "startStream() can only be called on continuous queries;") - } - - test("check startStream(path) can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.startStream("non_exist_path")) - assert(e.getMessage == "startStream() can only be called on continuous queries;") - } - - test("check mode(SaveMode) can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.mode(SaveMode.Append)) - assert(e.getMessage == "mode() can only be called on non-continuous queries;") - } - - test("check mode(string) can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.mode("append")) - assert(e.getMessage == "mode() can only be called on non-continuous queries;") - } - - test("check outputMode(OutputMode) can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.outputMode(OutputMode.Append)) - Seq("outputmode", "continuous queries").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - - test("check outputMode(string) can only be called on continuous queries") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val e = intercept[AnalysisException](w.outputMode("append")) - Seq("outputmode", "continuous queries").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - - test("check outputMode(string) throws exception on unsupported modes") { - def testError(outputMode: String): Unit = { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[IllegalArgumentException](w.outputMode(outputMode)) - Seq("output mode", "unknown", outputMode).foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - testError("Update") - testError("Xyz") - } - - test("check bucketBy() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream()) - assert(e.getMessage == "'startStream' does not support bucketing right now;") - } - - test("check sortBy() can only be called on non-continuous queries;") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.sortBy("text").startStream()) - assert(e.getMessage == "'startStream' does not support bucketing right now;") - } - - test("check save(path) can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.save("non_exist_path")) - assert(e.getMessage == "save() can only be called on non-continuous queries;") - } - - test("check save() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.save()) - assert(e.getMessage == "save() can only be called on non-continuous queries;") - } - - test("check insertInto() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.insertInto("non_exsit_table")) - assert(e.getMessage == "insertInto() can only be called on non-continuous queries;") - } - - test("check saveAsTable() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table")) - assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;") - } - - test("check jdbc() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.jdbc(null, null, null)) - assert(e.getMessage == "jdbc() can only be called on non-continuous queries;") - } - - test("check json() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.json("non_exist_path")) - assert(e.getMessage == "json() can only be called on non-continuous queries;") - } - - test("check parquet() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.parquet("non_exist_path")) - assert(e.getMessage == "parquet() can only be called on non-continuous queries;") - } - - test("check orc() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.orc("non_exist_path")) - assert(e.getMessage == "orc() can only be called on non-continuous queries;") - } - - test("check text() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.text("non_exist_path")) - assert(e.getMessage == "text() can only be called on non-continuous queries;") - } - - test("check csv() can only be called on non-continuous queries") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - val w = df.write - val e = intercept[AnalysisException](w.csv("non_exist_path")) - assert(e.getMessage == "csv() can only be called on non-continuous queries;") - } - - test("check foreach() does not support partitioning or bucketing") { - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - - var w = df.write.partitionBy("value") - var e = intercept[AnalysisException](w.foreach(null)) - Seq("foreach", "partitioning").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - - w = df.write.bucketBy(2, "value") - e = intercept[AnalysisException](w.foreach(null)) - Seq("foreach", "bucketing").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - - test("check jdbc() does not support partitioning or bucketing") { - val df = spark.read.text(newTextInput) - - var w = df.write.partitionBy("value") - var e = intercept[AnalysisException](w.jdbc(null, null, null)) - Seq("jdbc", "partitioning").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - - w = df.write.bucketBy(2, "value") - e = intercept[AnalysisException](w.jdbc(null, null, null)) - Seq("jdbc", "bucketing").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - - test("ConsoleSink can be correctly loaded") { - LastOptions.clear() - val df = spark.read - .format("org.apache.spark.sql.streaming.test") - .stream() - - val cq = df.write - .format("console") - .option("checkpointLocation", newMetadataDir) - .trigger(ProcessingTime(2.seconds)) - .startStream() - - cq.awaitTermination(2000L) - } - - test("prevent all column partitioning") { - withTempDir { dir => - val path = dir.getCanonicalPath - intercept[AnalysisException] { - spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) - } - intercept[AnalysisException] { - spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala new file mode 100644 index 0000000..c6d374f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.test + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ + +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.Utils + +object LastOptions { + + var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) + var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) + var parameters: Map[String, String] = null + var schema: Option[StructType] = null + var partitionColumns: Seq[String] = Nil + + def clear(): Unit = { + parameters = null + schema = null + partitionColumns = null + reset(mockStreamSourceProvider) + reset(mockStreamSinkProvider) + } +} + +/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */ +class DefaultSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + spark: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + LastOptions.parameters = parameters + LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.sourceSchema(spark, schema, providerName, parameters) + ("dummySource", fakeSchema) + } + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + LastOptions.parameters = parameters + LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.createSource( + spark, metadataPath, schema, providerName, parameters) + new Source { + override def schema: StructType = fakeSchema + + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import spark.implicits._ + + Seq[Int]().toDS().toDF() + } + } + } + + override def createSink( + spark: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + LastOptions.parameters = parameters + LastOptions.partitionColumns = partitionColumns + LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) + new Sink { + override def addBatch(batchId: Long, data: DataFrame): Unit = {} + } + } +} + +class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + after { + spark.streams.active.foreach(_.stop()) + } + + test("write cannot be called on streaming datasets") { + val e = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + .write + .save() + } + Seq("'write'", "not", "streaming Dataset/DataFrame").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("resolve default source") { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start() + .stop() + } + + test("resolve full class") { + spark.readStream + .format("org.apache.spark.sql.streaming.test.DefaultSource") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start() + .stop() + } + + test("options") { + val map = new java.util.HashMap[String, String] + map.put("opt3", "3") + + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("opt1", "1") + .options(Map("opt2" -> "2")) + .options(map) + .load() + + assert(LastOptions.parameters("opt1") == "1") + assert(LastOptions.parameters("opt2") == "2") + assert(LastOptions.parameters("opt3") == "3") + + LastOptions.clear() + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("opt1", "1") + .options(Map("opt2" -> "2")) + .options(map) + .option("checkpointLocation", newMetadataDir) + .start() + .stop() + + assert(LastOptions.parameters("opt1") == "1") + assert(LastOptions.parameters("opt2") == "2") + assert(LastOptions.parameters("opt3") == "3") + } + + test("partitioning") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start() + .stop() + assert(LastOptions.partitionColumns == Nil) + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .partitionBy("a") + .start() + .stop() + assert(LastOptions.partitionColumns == Seq("a")) + + withSQLConf("spark.sql.caseSensitive" -> "false") { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .partitionBy("A") + .start() + .stop() + assert(LastOptions.partitionColumns == Seq("a")) + } + + intercept[AnalysisException] { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .partitionBy("b") + .start() + .stop() + } + } + + test("stream paths") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .load("/test") + + assert(LastOptions.parameters("path") == "/test") + + LastOptions.clear() + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start("/test") + .stop() + + assert(LastOptions.parameters("path") == "/test") + } + + test("test different data types for options") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("intOpt", 56) + .option("boolOpt", false) + .option("doubleOpt", 6.7) + .load("/test") + + assert(LastOptions.parameters("intOpt") == "56") + assert(LastOptions.parameters("boolOpt") == "false") + assert(LastOptions.parameters("doubleOpt") == "6.7") + + LastOptions.clear() + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("intOpt", 56) + .option("boolOpt", false) + .option("doubleOpt", 6.7) + .option("checkpointLocation", newMetadataDir) + .start("/test") + .stop() + + assert(LastOptions.parameters("intOpt") == "56") + assert(LastOptions.parameters("boolOpt") == "false") + assert(LastOptions.parameters("doubleOpt") == "6.7") + } + + test("unique query names") { + + /** Start a query with a specific name */ + def startQueryWithName(name: String = ""): ContinuousQuery = { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load("/test") + .writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .queryName(name) + .start() + } + + /** Start a query without specifying a name */ + def startQueryWithoutName(): ContinuousQuery = { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load("/test") + .writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start() + } + + /** Get the names of active streams */ + def activeStreamNames: Set[String] = { + val streams = spark.streams.active + val names = streams.map(_.name).toSet + assert(streams.length === names.size, s"names of active queries are not unique: $names") + names + } + + val q1 = startQueryWithName("name") + + // Should not be able to start another query with the same name + intercept[IllegalArgumentException] { + startQueryWithName("name") + } + assert(activeStreamNames === Set("name")) + + // Should be able to start queries with other names + val q3 = startQueryWithName("another-name") + assert(activeStreamNames === Set("name", "another-name")) + + // Should be able to start queries with auto-generated names + val q4 = startQueryWithoutName() + assert(activeStreamNames.contains(q4.name)) + + // Should not be able to start a query with same auto-generated name + intercept[IllegalArgumentException] { + startQueryWithName(q4.name) + } + + // Should be able to start query with that name after stopping the previous query + q1.stop() + val q5 = startQueryWithName("name") + assert(activeStreamNames.contains("name")) + spark.streams.active.foreach(_.stop()) + } + + test("trigger") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load("/test") + + var q = df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(ProcessingTime(10.seconds)) + .start() + q.stop() + + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000)) + + q = df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) + .start() + q.stop() + + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000)) + } + + test("source metadataPath") { + LastOptions.clear() + + val checkpointLocation = newMetadataDir + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation) + .trigger(ProcessingTime(10.seconds)) + .start() + q.stop() + + verify(LastOptions.mockStreamSourceProvider).createSource( + spark.sqlContext, + checkpointLocation + "/sources/0", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + + verify(LastOptions.mockStreamSourceProvider).createSource( + spark.sqlContext, + checkpointLocation + "/sources/1", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + } + + private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath + + test("check outputMode(string) throws exception on unsupported modes") { + def testError(outputMode: String): Unit = { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + val w = df.writeStream + val e = intercept[IllegalArgumentException](w.outputMode(outputMode)) + Seq("output mode", "unknown", outputMode).foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + testError("Update") + testError("Xyz") + } + + test("check foreach() catches null writers") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + var w = df.writeStream + var e = intercept[IllegalArgumentException](w.foreach(null)) + Seq("foreach", "null").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + + test("check foreach() does not support partitioning") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + val foreachWriter = new ForeachWriter[Row] { + override def open(partitionId: Long, version: Long): Boolean = false + override def process(value: Row): Unit = {} + override def close(errorOrNull: Throwable): Unit = {} + } + var w = df.writeStream.partitionBy("value") + var e = intercept[AnalysisException](w.foreach(foreachWriter).start()) + Seq("foreach", "partitioning").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("ConsoleSink can be correctly loaded") { + LastOptions.clear() + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val cq = df.writeStream + .format("console") + .option("checkpointLocation", newMetadataDir) + .trigger(ProcessingTime(2.seconds)) + .start() + + cq.awaitTermination(2000L) + } + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + intercept[AnalysisException] { + spark.range(10).writeStream + .outputMode("append") + .partitionBy("id") + .format("parquet") + .start(path) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/214adb14/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala new file mode 100644 index 0000000..98e57b3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.test + +import org.apache.spark.sql._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.util.Utils + + +object LastOptions { + + var parameters: Map[String, String] = null + var schema: Option[StructType] = null + var saveMode: SaveMode = null + + def clear(): Unit = { + parameters = null + schema = null + saveMode = null + } +} + + +/** Dummy provider. */ +class DefaultSource + extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider { + + case class FakeRelation(sqlContext: SQLContext) extends BaseRelation { + override def schema: StructType = StructType(Seq(StructField("a", StringType))) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType + ): BaseRelation = { + LastOptions.parameters = parameters + LastOptions.schema = Some(schema) + FakeRelation(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String] + ): BaseRelation = { + LastOptions.parameters = parameters + LastOptions.schema = None + FakeRelation(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + LastOptions.parameters = parameters + LastOptions.schema = None + LastOptions.saveMode = mode + FakeRelation(sqlContext) + } +} + + +class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + test("writeStream cannot be called on non-streaming datasets") { + val e = intercept[AnalysisException] { + spark.read + .format("org.apache.spark.sql.test") + .load() + .writeStream + .start() + } + Seq("'writeStream'", "only", "streaming Dataset/DataFrame").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + + test("resolve default source") { + spark.read + .format("org.apache.spark.sql.test") + .load() + .write + .format("org.apache.spark.sql.test") + .save() + } + + test("resolve full class") { + spark.read + .format("org.apache.spark.sql.test.DefaultSource") + .load() + .write + .format("org.apache.spark.sql.test") + .save() + } + + test("options") { + val map = new java.util.HashMap[String, String] + map.put("opt3", "3") + + val df = spark.read + .format("org.apache.spark.sql.test") + .option("opt1", "1") + .options(Map("opt2" -> "2")) + .options(map) + .load() + + assert(LastOptions.parameters("opt1") == "1") + assert(LastOptions.parameters("opt2") == "2") + assert(LastOptions.parameters("opt3") == "3") + + LastOptions.clear() + + df.write + .format("org.apache.spark.sql.test") + .option("opt1", "1") + .options(Map("opt2" -> "2")) + .options(map) + .save() + + assert(LastOptions.parameters("opt1") == "1") + assert(LastOptions.parameters("opt2") == "2") + assert(LastOptions.parameters("opt3") == "3") + } + + test("save mode") { + val df = spark.read + .format("org.apache.spark.sql.test") + .load() + + df.write + .format("org.apache.spark.sql.test") + .mode(SaveMode.ErrorIfExists) + .save() + assert(LastOptions.saveMode === SaveMode.ErrorIfExists) + } + + test("paths") { + val df = spark.read + .format("org.apache.spark.sql.test") + .option("checkpointLocation", newMetadataDir) + .load("/test") + + assert(LastOptions.parameters("path") == "/test") + + LastOptions.clear() + + df.write + .format("org.apache.spark.sql.test") + .option("checkpointLocation", newMetadataDir) + .save("/test") + + assert(LastOptions.parameters("path") == "/test") + } + + test("test different data types for options") { + val df = spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 56) + .option("boolOpt", false) + .option("doubleOpt", 6.7) + .load("/test") + + assert(LastOptions.parameters("intOpt") == "56") + assert(LastOptions.parameters("boolOpt") == "false") + assert(LastOptions.parameters("doubleOpt") == "6.7") + + LastOptions.clear() + df.write + .format("org.apache.spark.sql.test") + .option("intOpt", 56) + .option("boolOpt", false) + .option("doubleOpt", 6.7) + .option("checkpointLocation", newMetadataDir) + .save("/test") + + assert(LastOptions.parameters("intOpt") == "56") + assert(LastOptions.parameters("boolOpt") == "false") + assert(LastOptions.parameters("doubleOpt") == "6.7") + } + + test("check jdbc() does not support partitioning or bucketing") { + val df = spark.read.text(Utils.createTempDir(namePrefix = "text").getCanonicalPath) + + var w = df.write.partitionBy("value") + var e = intercept[AnalysisException](w.jdbc(null, null, null)) + Seq("jdbc", "partitioning").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + + w = df.write.bucketBy(2, "value") + e = intercept[AnalysisException](w.jdbc(null, null, null)) + Seq("jdbc", "bucketing").foreach { s => + assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) + } + } + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + intercept[AnalysisException] { + spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) + } + intercept[AnalysisException] { + spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
