This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3814d15bd19 [SPARK-43129] Scala core API for streaming Spark Connect 3814d15bd19 is described below commit 3814d15bd190bdae69c5c22b461e3f6145cca1f7 Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Thu Apr 20 08:33:36 2023 +0900 [SPARK-43129] Scala core API for streaming Spark Connect ### What changes were proposed in this pull request? Implements core streaming API in Scala for running streaming queries over Spark Connect. This is functionally equivalent to Python side PR #40586 There are no server side changes here since it was done earlier in Python PR. We can run most streaming queries. Notably, queries using `foreachBatch()` are not yet supported. ### Why are the changes needed? This adds structured streaming support in Scala for Spark connect. ### Does this PR introduce _any_ user-facing change? Adds more streaming API to Scala Spark Connect client. ### How was this patch tested? - Unit test - Manual testing Closes #40783 from rangadi/scala-m1. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/streaming/Trigger.java | 180 ++++++++++++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 42 +++- .../scala/org/apache/spark/sql/SparkSession.scala | 17 +- .../spark/sql/execution/streaming/Triggers.scala | 115 +++++++++ .../spark/sql/streaming/DataStreamReader.scala | 273 +++++++++++++++++++++ .../spark/sql/streaming/DataStreamWriter.scala | 266 ++++++++++++++++++++ .../spark/sql/streaming/StreamingQuery.scala | 245 ++++++++++++++++++ .../spark/sql/streaming/StreamingQueryStatus.scala | 72 ++++++ .../org/apache/spark/sql/streaming/progress.scala | 22 ++ .../CheckConnectJvmClientCompatibility.scala | 36 ++- .../spark/sql/streaming/StreamingQuerySuite.scala | 81 ++++++ dev/checkstyle-suppressions.xml | 2 + 12 files changed, 1345 insertions(+), 6 deletions(-) diff --git a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java new file mode 100644 index 00000000000..27ffe67d990 --- /dev/null +++ b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$; +import org.apache.spark.sql.execution.streaming.ContinuousTrigger; +import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; +import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; + +/** + * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. + * + * @since 3.5.0 + */ +@Evolving +public class Trigger { + // This is a copy of the same class in sql/core/.../streaming/Trigger.java + + /** + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * @since 3.5.0 + */ + public static Trigger ProcessingTime(long intervalMs) { + return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * (Java-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is 0, the query will run as fast as possible. + * + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) + * }}} + * + * @since 3.5.0 + */ + public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { + return ProcessingTimeTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `duration` is 0, the query will run as fast as possible. + * + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(Trigger.ProcessingTime(10.seconds)) + * }}} + * @since 3.5.0 + */ + public static Trigger ProcessingTime(Duration interval) { + return ProcessingTimeTrigger.apply(interval); + } + + /** + * A trigger policy that runs a query periodically based on an interval in processing time. + * If `interval` is effectively 0, the query will run as fast as possible. + * + * {{{ + * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) + * }}} + * @since 3.5.0 + */ + public static Trigger ProcessingTime(String interval) { + return ProcessingTimeTrigger.apply(interval); + } + + /** + * A trigger that processes all available data in a single batch then terminates the query. + * + * @since 3.5.0 + * @deprecated This is deprecated as of Spark 3.4.0. Use {@link #AvailableNow()} to leverage + * better guarantee of processing, fine-grained scale of batches, and better gradual + * processing of watermark advancement including no-data batch. + * See the NOTES in {@link #AvailableNow()} for details. + */ + @Deprecated + public static Trigger Once() { + return OneTimeTrigger$.MODULE$; + } + + /** + * A trigger that processes all available data at the start of the query in one or multiple + * batches, then terminates the query. + * + * Users are encouraged to set the source options to control the size of the batch as similar as + * controlling the size of the batch in {@link #ProcessingTime(long)} trigger. + * + * NOTES: + * - This trigger provides a strong guarantee of processing: regardless of how many batches were + * left over in previous run, it ensures all available data at the time of execution gets + * processed before termination. All uncommitted batches will be processed first. + * - Watermark gets advanced per each batch, and no-data batch gets executed before termination + * if the last batch advances the watermark. This helps to maintain smaller and predictable + * state size and smaller latency on the output of stateful operators. + * + * @since 3.5.0 + */ + public static Trigger AvailableNow() { + return AvailableNowTrigger$.MODULE$; + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 3.5.0 + */ + public static Trigger Continuous(long intervalMs) { + return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + * import java.util.concurrent.TimeUnit + * df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS)) + * }}} + * + * @since 3.5.0 + */ + public static Trigger Continuous(long interval, TimeUnit timeUnit) { + return ContinuousTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + * import scala.concurrent.duration._ + * df.writeStream.trigger(Trigger.Continuous(10.seconds)) + * }}} + * @since 3.5.0 + */ + public static Trigger Continuous(Duration interval) { + return ContinuousTrigger.apply(interval); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + * df.writeStream.trigger(Trigger.Continuous("10 seconds")) + * }}} + * @since 3.5.0 + */ + public static Trigger Continuous(String interval) { + return ContinuousTrigger.apply(interval); + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 023379c00a5..97914f660cf 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.connect.client.{SparkResult, UdfUtils} import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter} import org.apache.spark.sql.expressions.ScalarUserDefinedFunction import org.apache.spark.sql.functions.{struct, to_json} +import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types.{Metadata, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -2935,6 +2936,16 @@ class Dataset[T] private[sql] ( new DataFrameWriterV2[T](table, this) } + /** + * Interface for saving the content of the streaming Dataset out into external storage. + * + * @group basic + * @since 3.5.0 + */ + def writeStream: DataStreamWriter[T] = { + new DataStreamWriter[T](this) + } + /** * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`). * @@ -3017,8 +3028,37 @@ class Dataset[T] private[sql] ( .getStorageLevel) } + /** + * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time + * before which we assume no more late data is going to arrive. + * + * Spark will use this watermark for several purposes: <ul> <li>To know when a given time window + * aggregation can be finalized and thus can be emitted when using output modes that do not + * allow updates.</li> <li>To minimize the amount of state that we need to keep for on-going + * aggregations, `mapGroupsWithState` and `dropDuplicates` operators.</li> </ul> The current + * watermark is computed by looking at the `MAX(eventTime)` seen across all of the partitions in + * the query minus a user specified `delayThreshold`. Due to the cost of coordinating this value + * across partitions, the actual watermark used is only guaranteed to be at least + * `delayThreshold` behind the actual event time. In some cases we may still process records + * that arrive more than `delayThreshold` late. + * + * @param eventTime + * the name of the column that contains the event time of the row. + * @param delayThreshold + * the minimum delay to wait to data to arrive late, relative to the latest record that has + * been processed in the form of an interval (e.g. "1 minute" or "5 hours"). NOTE: This should + * not be negative. + * + * @group streaming + * @since 3.5.0 + */ def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = { - throw new UnsupportedOperationException("withWatermark is not implemented.") + sparkSession.newDataset(encoder) { builder => + builder.getWithWatermarkBuilder + .setInput(plan.getRoot) + .setEventTime(eventTime) + .setDelayThreshold(delayThreshold) + } } def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index d68988cd435..00910d5904a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -28,6 +28,7 @@ import org.apache.arrow.memory.RootAllocator import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.internal.Logging import org.apache.spark.sql.catalog.Catalog import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection} @@ -37,6 +38,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder, SparkConnectClient, Spa import org.apache.spark.sql.connect.client.util.{Cleaner, ConvertToArrow} import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto import org.apache.spark.sql.internal.CatalogImpl +import org.apache.spark.sql.streaming.DataStreamReader import org.apache.spark.sql.types.StructType /** @@ -287,6 +289,17 @@ class SparkSession private[sql] ( */ def read: DataFrameReader = new DataFrameReader(this) + /** + * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. + * {{{ + * sparkSession.readStream.parquet("/path/to/directory/of/parquet/files") + * sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files") + * }}} + * + * @since 3.5.0 + */ + def readStream: DataStreamReader = new DataStreamReader(this) + /** * Interface through which the user may create, drop, alter or query underlying databases, * tables, functions etc. @@ -453,9 +466,9 @@ class SparkSession private[sql] ( client.execute(plan).asScala.foreach(_ => ()) } - private[sql] def execute(command: proto.Command): Unit = { + private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = { val plan = proto.Plan.newBuilder().setCommand(command).build() - client.execute(plan).asScala.foreach(_ => ()) + client.execute(plan).asScala.toSeq } @DeveloperApi diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala new file mode 100644 index 00000000000..be1b0e8ac0c --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY +import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis +import org.apache.spark.sql.catalyst.util.IntervalUtils +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.unsafe.types.UTF8String + +private object Triggers { + // This is a copy of the same class in sql/core/...execution/streaming/Triggers.scala + + def validate(intervalMs: Long): Unit = { + require(intervalMs >= 0, "the interval of trigger should not be negative") + } + + def convert(interval: String): Long = { + val cal = IntervalUtils.stringToInterval(UTF8String.fromString(interval)) + if (cal.months != 0) { + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + } + val microsInDays = Math.multiplyExact(cal.days, MICROS_PER_DAY) + microsToMillis(Math.addExact(cal.microseconds, microsInDays)) + } + + def convert(interval: Duration): Long = interval.toMillis + + def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval) +} + +/** + * A [[Trigger]] that processes all available data in one batch then terminates the query. + */ +case object OneTimeTrigger extends Trigger + +/** + * A [[Trigger]] that processes all available data in multiple batches then terminates the query. + */ +case object AvailableNowTrigger extends Trigger + +/** + * A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0, + * the query will run as fast as possible. + */ +case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger { + Triggers.validate(intervalMs) +} + +object ProcessingTimeTrigger { + import Triggers._ + + def apply(interval: String): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval)) + } + + def apply(interval: Duration): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval)) + } + + def create(interval: String): ProcessingTimeTrigger = { + apply(interval) + } + + def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval, unit)) + } +} + +/** + * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at the + * specified interval. + */ +case class ContinuousTrigger(intervalMs: Long) extends Trigger { + Triggers.validate(intervalMs) +} + +object ContinuousTrigger { + import Triggers._ + + def apply(interval: String): ContinuousTrigger = { + ContinuousTrigger(convert(interval)) + } + + def apply(interval: Duration): ContinuousTrigger = { + ContinuousTrigger(convert(interval)) + } + + def create(interval: String): ContinuousTrigger = { + apply(interval) + } + + def create(interval: Long, unit: TimeUnit): ContinuousTrigger = { + ContinuousTrigger(convert(interval, unit)) + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala new file mode 100644 index 00000000000..f6f41257417 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.connect.proto.Read.DataSource +import org.apache.spark.internal.Logging +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder +import org.apache.spark.sql.types.StructType + +/** + * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems, + * key-value stores, etc). Use `SparkSession.readStream` to access this. + * + * @since 3.5.0 + */ +@Evolving +final class DataStreamReader private[sql] (sparkSession: SparkSession) extends Logging { + + /** + * Specifies the input data source format. + * + * @since 3.5.0 + */ + def format(source: String): DataStreamReader = { + sourceBuilder.setFormat(source) + this + } + + /** + * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + * automatically from data. By specifying the schema here, the underlying data source can skip + * the schema inference step, and thus speed up data loading. + * + * @since 3.5.0 + */ + def schema(schema: StructType): DataStreamReader = { + if (schema != null) { + sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail all the attributes. + } + this + } + + /** + * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON) + * can infer the input schema automatically from data. By specifying the schema here, the + * underlying data source can skip the schema inference step, and thus speed up data loading. + * + * @since 3.5.0 + */ + def schema(schemaString: String): DataStreamReader = { + sourceBuilder.setSchema(schemaString) + this + } + + /** + * Adds an input option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: String): DataStreamReader = { + sourceBuilder.putOptions(key, value) + this + } + + /** + * Adds an input option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString) + + /** + * Adds an input option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Long): DataStreamReader = option(key, value.toString) + + /** + * Adds an input option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Double): DataStreamReader = option(key, value.toString) + + /** + * (Scala-specific) Adds input options for the underlying data source. + * + * @since 3.5.0 + */ + def options(options: scala.collection.Map[String, String]): DataStreamReader = { + this.options(options.asJava) + this + } + + /** + * (Java-specific) Adds input options for the underlying data source. + * + * @since 3.5.0 + */ + def options(options: java.util.Map[String, String]): DataStreamReader = { + sourceBuilder.putAllOptions(options) + this + } + + /** + * Loads input data stream in as a `DataFrame`, for data streams that don't require a path (e.g. + * external key-value stores). + * + * @since 3.5.0 + */ + def load(): DataFrame = { + sparkSession.newDataFrame { relationBuilder => + relationBuilder.getReadBuilder + .setIsStreaming(true) + .setDataSource(sourceBuilder.build()) + } + } + + /** + * Loads input in as a `DataFrame`, for data streams that read from some path. + * + * @since 3.5.0 + */ + def load(path: String): DataFrame = { + sourceBuilder.clearPaths() + sourceBuilder.addPaths(path) + load() + } + + /** + * Loads a JSON file stream and returns the results as a `DataFrame`. + * + * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by + * default. For JSON (one record per file), set the `multiLine` option to true. + * + * This function goes through the input once to determine the input schema. If you know the + * schema in advance, use the version that specifies the schema to avoid the extra scan. + * + * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): + * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * + * You can find the JSON-specific options for reading JSON file stream in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.5.0 + */ + def json(path: String): DataFrame = { + format("json").load(path) + } + + /** + * Loads a CSV file stream and returns the result as a `DataFrame`. + * + * This function will go through the input once to determine the input schema if `inferSchema` + * is enabled. To avoid going through the entire data once, disable `inferSchema` option or + * specify the schema explicitly using `schema`. + * + * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): + * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * + * You can find the CSV-specific options for reading CSV file stream in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.5.0 + */ + def csv(path: String): DataFrame = format("csv").load(path) + + /** + * Loads a ORC file stream, returning the result as a `DataFrame`. + * + * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): + * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * + * ORC-specific option(s) for reading ORC file stream can be found in <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option"> Data + * Source Option</a> in the version you use. + * + * @since 3.5.0 + */ + def orc(path: String): DataFrame = format("orc").load(path) + + /** + * Loads a Parquet file stream, returning the result as a `DataFrame`. + * + * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): + * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * + * Parquet-specific option(s) for reading Parquet file stream can be found in <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option"> Data + * Source Option</a> in the version you use. + * + * @since 3.5.0 + */ + def parquet(path: String): DataFrame = format("parquet").load(path) + + /** + * Loads text files and returns a `DataFrame` whose schema starts with a string column named + * "value", and followed by partitioned columns if there are any. The text files must be encoded + * as UTF-8. + * + * By default, each line in the text files is a new row in the resulting DataFrame. For example: + * {{{ + * // Scala: + * spark.readStream.text("/path/to/directory/") + * + * // Java: + * spark.readStream().text("/path/to/directory/") + * }}} + * + * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): + * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * + * You can find the text-specific options for reading text files in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.5.0 + */ + def text(path: String): DataFrame = format("text").load(path) + + /** + * Loads text file(s) and returns a `Dataset` of String. The underlying schema of the Dataset + * contains a single string column named "value". The text files must be encoded as UTF-8. + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * By default, each line in the text file is a new element in the resulting Dataset. For + * example: + * {{{ + * // Scala: + * spark.readStream.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.readStream().textFile("/path/to/spark/README.md") + * }}} + * + * You can set the text-specific options as specified in `DataStreamReader.text`. + * + * @param path + * input path + * @since 3.5.0 + */ + def textFile(path: String): Dataset[String] = { + text(path).select("value").as[String](StringEncoder) + } + + private val sourceBuilder = DataSource.newBuilder() +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala new file mode 100644 index 00000000000..32b4aa623bb --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.Locale +import java.util.concurrent.TimeoutException + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.connect.proto.Command +import org.apache.spark.connect.proto.WriteStreamOperationStart +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.streaming.AvailableNowTrigger +import org.apache.spark.sql.execution.streaming.ContinuousTrigger +import org.apache.spark.sql.execution.streaming.OneTimeTrigger +import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger + +/** + * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, + * key-value stores, etc). Use `Dataset.writeStream` to access this. + * + * @since 3.5.0 + */ +@Evolving +final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging { + + /** + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. <ul> <li> + * `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be written + * to the sink.</li> <li> `OutputMode.Complete()`: all the rows in the streaming + * DataFrame/Dataset will be written to the sink every time there are some updates.</li> <li> + * `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset + * will be written to the sink every time there are some updates. If the query doesn't contain + * aggregations, it will be equivalent to `OutputMode.Append()` mode.</li> </ul> + * + * @since 3.5.0 + */ + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { + sinkBuilder.setOutputMode(outputMode.toString.toLowerCase(Locale.ROOT)) + this + } + + /** + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. <ul> <li> + * `append`: only the new rows in the streaming DataFrame/Dataset will be written to the + * sink.</li> <li> `complete`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time there are some updates.</li> <li> `update`: only the rows that were + * updated in the streaming DataFrame/Dataset will be written to the sink every time there are + * some updates. If the query doesn't contain aggregations, it will be equivalent to `append` + * mode.</li> </ul> + * + * @since 3.5.0 + */ + def outputMode(outputMode: String): DataStreamWriter[T] = { + sinkBuilder.setOutputMode(outputMode) + this + } + + /** + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will + * run the query as fast as possible. + * + * Scala Example: + * {{{ + * df.writeStream.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.writeStream.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.writeStream().trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 3.5.0 + */ + def trigger(trigger: Trigger): DataStreamWriter[T] = { + trigger match { + case ProcessingTimeTrigger(intervalMs) => + sinkBuilder.setProcessingTimeInterval(s"$intervalMs milliseconds") + case AvailableNowTrigger => + sinkBuilder.setAvailableNow(true) + case OneTimeTrigger => + sinkBuilder.setOnce(true) + case ContinuousTrigger(intervalMs) => + sinkBuilder.setContinuousCheckpointInterval(s"$intervalMs milliseconds") + } + this + } + + /** + * Specifies the name of the [[StreamingQuery]] that can be started with `start()`. This name + * must be unique among all the currently active queries in the associated SQLContext. + * + * @since 3.5.0 + */ + def queryName(queryName: String): DataStreamWriter[T] = { + sinkBuilder.setQueryName(queryName) + this + } + + /** + * Specifies the underlying output data source. + * + * @since 3.5.0 + */ + def format(source: String): DataStreamWriter[T] = { + sinkBuilder.setFormat(source) + this + } + + /** + * Partitions the output by the given columns on the file system. If specified, the output is + * laid out on the file system similar to Hive's partitioning scheme. As an example, when we + * partition a dataset by year and then month, the directory layout would look like: + * + * <ul> <li> year=2016/month=01/</li> <li> year=2016/month=02/</li> </ul> + * + * Partitioning is one of the most widely used techniques to optimize physical data layout. It + * provides a coarse-grained index for skipping unnecessary data reads when queries have + * predicates on the partitioned columns. In order for partitioning to work well, the number of + * distinct values in each column should typically be less than tens of thousands. + * + * @since 3.5.0 + */ + @scala.annotation.varargs + def partitionBy(colNames: String*): DataStreamWriter[T] = { + sinkBuilder.clearPartitioningColumnNames() + sinkBuilder.addAllPartitioningColumnNames(colNames.asJava) + this + } + + /** + * Adds an output option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: String): DataStreamWriter[T] = { + sinkBuilder.putOptions(key, value) + this + } + + /** + * Adds an output option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, value.toString) + + /** + * Adds an output option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Long): DataStreamWriter[T] = option(key, value.toString) + + /** + * Adds an output option for the underlying data source. + * + * @since 3.5.0 + */ + def option(key: String, value: Double): DataStreamWriter[T] = option(key, value.toString) + + /** + * (Scala-specific) Adds output options for the underlying data source. + * + * @since 3.5.0 + */ + def options(options: scala.collection.Map[String, String]): DataStreamWriter[T] = { + this.options(options.asJava) + this + } + + /** + * Adds output options for the underlying data source. + * + * @since 3.5.0 + */ + def options(options: java.util.Map[String, String]): DataStreamWriter[T] = { + sinkBuilder.putAllOptions(options) + this + } + + /** + * Starts the execution of the streaming query, which will continually output results to the + * given path as new data arrives. The returned [[StreamingQuery]] object can be used to + * interact with the stream. + * + * @since 3.5.0 + */ + def start(path: String): StreamingQuery = { + sinkBuilder.setPath(path) + start() + } + + /** + * Starts the execution of the streaming query, which will continually output results to the + * given path as new data arrives. The returned [[StreamingQuery]] object can be used to + * interact with the stream. Throws a `TimeoutException` if the following conditions are met: + * - Another run of the same streaming query, that is a streaming query sharing the same + * checkpoint location, is already active on the same Spark Driver + * - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is enabled + * - The active run cannot be stopped within the timeout controlled by the SQL configuration + * `spark.sql.streaming.stopTimeout` + * + * @since 3.5.0 + */ + @throws[TimeoutException] + def start(): StreamingQuery = { + val startCmd = Command + .newBuilder() + .setWriteStreamOperationStart(sinkBuilder.build()) + .build() + + val resp = ds.sparkSession.execute(startCmd).head + RemoteStreamingQuery.fromStartCommandResponse(ds.sparkSession, resp) + } + + /** + * Starts the execution of the streaming query, which will continually output results to the + * given table as new data arrives. The returned [[StreamingQuery]] object can be used to + * interact with the stream. + * + * For v1 table, partitioning columns provided by `partitionBy` will be respected no matter the + * table exists or not. A new table will be created if the table not exists. + * + * For v2 table, `partitionBy` will be ignored if the table already exists. `partitionBy` will + * be respected only if the v2 table does not exist. Besides, the v2 table created by this API + * lacks some functionalities (e.g., customized properties, options, and serde info). If you + * need them, please create the v2 table manually before the execution to avoid creating a table + * with incomplete information. + * + * @since 3.5.0 + */ + @Evolving + @throws[TimeoutException] + def toTable(tableName: String): StreamingQuery = { + sinkBuilder.setTableName(tableName) + start() + } + + private val sinkBuilder = WriteStreamOperationStart + .newBuilder() + .setInput(ds.plan.getRoot) +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala new file mode 100644 index 00000000000..8bb35382162 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID +import java.util.concurrent.TimeoutException + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.connect.proto.Command +import org.apache.spark.connect.proto.ExecutePlanResponse +import org.apache.spark.connect.proto.StreamingQueryCommand +import org.apache.spark.connect.proto.StreamingQueryCommandResult +import org.apache.spark.sql.SparkSession + +/** + * A handle to a query that is executing continuously in the background as new data arrives. All + * these methods are thread-safe. + * @since 3.5.0 + */ +@Evolving +trait StreamingQuery { + // This is a copy of StreamingQuery in sql/core/.../streaming/StreamingQuery.scala + + /** + * Returns the user-specified name of the query, or null if not specified. This name can be + * specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as + * `dataframe.writeStream.queryName("query").start()`. This name, if set, must be unique across + * all active queries. + * + * @since 3.5.0 + */ + def name: String + + /** + * Returns the unique id of this query that persists across restarts from checkpoint data. That + * is, this id is generated when a query is started for the first time, and will be the same + * every time it is restarted from checkpoint data. Also see [[runId]]. + * + * @since 3.5.0 + */ + def id: UUID + + /** + * Returns the unique id of this run of the query. That is, every start/restart of a query will + * generate a unique runId. Therefore, every time a query is restarted from checkpoint, it will + * have the same [[id]] but different [[runId]]s. + */ + def runId: UUID + + /** + * Returns the `SparkSession` associated with `this`. + * + * @since 3.5.0 + */ + def sparkSession: SparkSession + + /** + * Returns `true` if this query is actively running. + * + * @since 3.5.0 + */ + def isActive: Boolean + + /** + * Returns the current status of the query. + * + * @since 3.5.0 + */ + def status: StreamingQueryStatus + + /** + * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The + * number of progress updates retained for each stream is configured by Spark session + * configuration `spark.sql.streaming.numRecentProgressUpdates`. + * + * @since 3.5.0 + */ + def recentProgress: Array[StreamingQueryProgress] + + /** + * Returns the most recent [[StreamingQueryProgress]] update of this streaming query. + * + * @since 3.5.0 + */ + def lastProgress: StreamingQueryProgress + + /** + * Blocks until all available data in the source has been processed and committed to the sink. + * This method is intended for testing. Note that in the case of continually arriving data, this + * method may block forever. Additionally, this method is only guaranteed to block until data + * that has been synchronously appended data to a + * `org.apache.spark.sql.execution.streaming.Source` prior to invocation. (i.e. `getOffset` must + * immediately reflect the addition). + * @since 3.5.0 + */ + def processAllAvailable(): Unit + + /** + * Stops the execution of this query if it is running. This waits until the termination of the + * query execution threads or until a timeout is hit. + * + * By default stop will block indefinitely. You can configure a timeout by the configuration + * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) milliseconds will block + * indefinitely. If a `TimeoutException` is thrown, users can retry stopping the stream. If the + * issue persists, it is advisable to kill the Spark application. + * + * @since 3.5.0 + */ + @throws[TimeoutException] + def stop(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * @since 3.5.0 + */ + def explain(): Unit + + /** + * Prints the physical plan to the console for debugging purposes. + * + * @param extended + * whether to do extended explain or not + * @since 3.5.0 + */ + def explain(extended: Boolean): Unit +} + +class RemoteStreamingQuery( + override val id: UUID, + override val runId: UUID, + override val name: String, + override val sparkSession: SparkSession) + extends StreamingQuery { + + override def isActive: Boolean = { + executeQueryCmd(_.setStatus(true)).getStatus.getIsActive + } + + override def status: StreamingQueryStatus = { + val statusResp = executeQueryCmd(_.setStatus(true)).getStatus + new StreamingQueryStatus( + message = statusResp.getStatusMessage, + isDataAvailable = statusResp.getIsDataAvailable, + isTriggerActive = statusResp.getIsTriggerActive) + } + + override def recentProgress: Array[StreamingQueryProgress] = { + executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala + .map(json => new StreamingQueryProgress(json)) + .toArray + } + + override def lastProgress: StreamingQueryProgress = { + executeQueryCmd( + _.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption + .map(json => new StreamingQueryProgress(json)) + .orNull + } + + override def processAllAvailable(): Unit = { + executeQueryCmd(_.setProcessAllAvailable(true)) + } + + override def stop(): Unit = { + executeQueryCmd(_.setStop(true)) + } + + override def explain(): Unit = { + explain(extended = false) + } + + override def explain(extended: Boolean): Unit = { + val explainCmd = StreamingQueryCommand.ExplainCommand + .newBuilder() + .setExtended(extended) + .build() + + val explain = executeQueryCmd(_.setExplain(explainCmd)).getExplain.getResult + + // scalastyle:off println + println(explain) + // scalastyle:on println + } + + private def executeQueryCmd( + setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command field, like stop(). + ): StreamingQueryCommandResult = { + + val cmdBuilder = Command.newBuilder() + val queryCmdBuilder = cmdBuilder.getStreamingQueryCommandBuilder + + // Set queryId. + queryCmdBuilder.getQueryIdBuilder + .setId(id.toString) + .setRunId(runId.toString) + + // Set command. + setCmdFn(queryCmdBuilder) + + val resp = sparkSession.execute(cmdBuilder.build()).head + + if (!resp.hasStreamingQueryCommandResult) { + throw new RuntimeException("Unexpected missing response for streaming query command") + } + + resp.getStreamingQueryCommandResult + } +} + +object RemoteStreamingQuery { + + def fromStartCommandResponse( + sparkSession: SparkSession, + response: ExecutePlanResponse): RemoteStreamingQuery = { + + if (!response.hasWriteStreamOperationStartResult) { + throw new RuntimeException("Unexpected: No result in response for start stream command") + } + + val result = response.getWriteStreamOperationStartResult + + new RemoteStreamingQuery( + id = UUID.fromString(result.getQueryId.getId), + runId = UUID.fromString(result.getQueryId.getRunId), + name = if (result.getName.isEmpty) null else result.getName, + sparkSession = sparkSession) + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala new file mode 100644 index 00000000000..cdda25876b2 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Evolving + +/** + * Reports information about the instantaneous status of a streaming query. + * + * @param message + * A human readable description of what the stream is currently doing. + * @param isDataAvailable + * True when there is new data to be processed. Doesn't apply to ContinuousExecution where it is + * always false. + * @param isTriggerActive + * True when the trigger is actively firing, false when waiting for the next trigger time. + * Doesn't apply to ContinuousExecution where it is always false. + * + * @since 3.5.0 + */ +@Evolving +class StreamingQueryStatus protected[sql] ( + val message: String, + val isDataAvailable: Boolean, + val isTriggerActive: Boolean) + extends Serializable { + // This is a copy of the same class in sql/core/.../streaming/StreamingQueryStatus.scala + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def copy( + message: String = this.message, + isDataAvailable: Boolean = this.isDataAvailable, + isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = { + new StreamingQueryStatus( + message = message, + isDataAvailable = isDataAvailable, + isTriggerActive = isTriggerActive) + } + + private[sql] def jsonValue: JValue = { + ("message" -> JString(message)) ~ + ("isDataAvailable" -> JBool(isDataAvailable)) ~ + ("isTriggerActive" -> JBool(isTriggerActive)) + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala new file mode 100644 index 00000000000..974bcd64b29 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +class StreamingQueryProgress private[sql] (val json: String) { + // TODO(SPARK-43128): (Implement full object by parsing from json). +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 4edea00cd51..7520588580a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -126,7 +126,12 @@ object CheckConnectJvmClientCompatibility { IncludeByName("org.apache.spark.sql.RuntimeConfig.*"), IncludeByName("org.apache.spark.sql.TypedColumn.*"), IncludeByName("org.apache.spark.sql.SQLImplicits.*"), - IncludeByName("org.apache.spark.sql.DatasetHolder.*")) + IncludeByName("org.apache.spark.sql.DatasetHolder.*"), + IncludeByName("org.apache.spark.sql.streaming.DataStreamReader.*"), + IncludeByName("org.apache.spark.sql.streaming.DataStreamWriter.*"), + IncludeByName("org.apache.spark.sql.streaming.StreamingQuery.*"), + IncludeByName("org.apache.spark.sql.streaming.StreamingQueryStatus.*"), + IncludeByName("org.apache.spark.sql.streaming.StreamingQueryProgress.*")) val excludeRules = Seq( // Filter unsupported rules: // Note when muting errors for a method, checks on all overloading methods are also muted. @@ -164,7 +169,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"), // functions @@ -197,7 +201,6 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"), // RuntimeConfig @@ -206,6 +209,33 @@ object CheckConnectJvmClientCompatibility { // TypedColumn ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"), + // DataStreamReader + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( SPARK-43144) + ), + + // DataStreamWriter + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // TODO(SPARK-43133) + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.DataStreamWriter.foreachBatch" // TODO(SPARK-42944) + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are constant vals. + ), + + // StreamingQuery + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" // TODO(SPARK-43143) + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.StreamingQuery.exception" // TODO(SPARK-43134) + ), + ProblemFilters.exclude[Problem]( + "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // TODO(SPARK-43128) + ), + // SQLImplicits ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala new file mode 100644 index 00000000000..3659c0a5157 --- /dev/null +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.timeout +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.SQLHelper +import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.window + +class StreamingQuerySuite extends RemoteSparkSession with SQLHelper { + + test("Streaming API with windowed aggregate query") { + // This verifies standard streaming API by starting a streaming query with windowed count. + withSQLConf( + "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. + ) { + val readDF = spark.readStream + .format("rate") + .option("rowsPerSecond", "10") + .option("numPartitions", "1") + .load() + + // Verify schema (results in sending an RPC) + assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT") + + val countsDF = readDF + .withWatermark("timestamp", "10 seconds") + .groupBy(window(col("timestamp"), "5 seconds")) + .count() + .selectExpr("window.start as timestamp", "count as num_events") + + assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT NOT NULL") + + // Start the query + val queryName = "sparkConnectStreamingQuery" + + val query = countsDF.writeStream + .format("memory") + .queryName(queryName) + .trigger(Trigger.ProcessingTime("1 second")) + .start() + + try { + // Verify some of the API. + assert(query.isActive) + + eventually(timeout(10.seconds)) { + assert(query.status.isDataAvailable) + assert(query.recentProgress.nonEmpty) // Query made progress. + } + + query.explain() // Prints the plan to console. + // Consider verifying explain output by capturing stdout similar to + // test("Dataset explain") in ClientE2ETestSuite. + + } finally { + // Don't wait for any processed data. Otherwise the test could take multiple seconds. + query.stop() + } + } + } +} diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index e2f5645a9c2..8929fb6224d 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -58,4 +58,6 @@ files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> <suppress checks="MethodName" files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> + <suppress checks="MethodName" + files="connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> </suppressions> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org