This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3814d15bd19 [SPARK-43129] Scala core API for streaming Spark Connect
3814d15bd19 is described below

commit 3814d15bd190bdae69c5c22b461e3f6145cca1f7
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Thu Apr 20 08:33:36 2023 +0900

    [SPARK-43129] Scala core API for streaming Spark Connect
    
    ### What changes were proposed in this pull request?
    Implements core streaming API in Scala for running streaming queries over 
Spark Connect.
    This is functionally equivalent to Python side PR #40586
    
    There are no server side changes here since it was done earlier in Python 
PR.
    
    We can run most streaming queries.
    Notably, queries using `foreachBatch()` are not yet supported.
    
    ### Why are the changes needed?
    This adds structured streaming support in Scala for Spark connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Adds more streaming API to Scala Spark Connect client.
    
    ### How was this patch tested?
    
      - Unit test
      - Manual testing
    
    Closes #40783 from rangadi/scala-m1.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/streaming/Trigger.java    | 180 ++++++++++++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  42 +++-
 .../scala/org/apache/spark/sql/SparkSession.scala  |  17 +-
 .../spark/sql/execution/streaming/Triggers.scala   | 115 +++++++++
 .../spark/sql/streaming/DataStreamReader.scala     | 273 +++++++++++++++++++++
 .../spark/sql/streaming/DataStreamWriter.scala     | 266 ++++++++++++++++++++
 .../spark/sql/streaming/StreamingQuery.scala       | 245 ++++++++++++++++++
 .../spark/sql/streaming/StreamingQueryStatus.scala |  72 ++++++
 .../org/apache/spark/sql/streaming/progress.scala  |  22 ++
 .../CheckConnectJvmClientCompatibility.scala       |  36 ++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  81 ++++++
 dev/checkstyle-suppressions.xml                    |   2 +
 12 files changed, 1345 insertions(+), 6 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 00000000000..27ffe67d990
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+
+/**
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 3.5.0
+ */
+@Evolving
+public class Trigger {
+  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+      return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import java.util.concurrent.TimeUnit
+   *    df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+      return ProcessingTimeTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `duration` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(Duration interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is effectively 0, the query will run as fast as possible.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(String interval) {
+      return ProcessingTimeTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger that processes all available data in a single batch then 
terminates the query.
+   *
+   * @since 3.5.0
+   * @deprecated This is deprecated as of Spark 3.4.0. Use {@link 
#AvailableNow()} to leverage
+   *             better guarantee of processing, fine-grained scale of 
batches, and better gradual
+   *             processing of watermark advancement including no-data batch.
+   *             See the NOTES in {@link #AvailableNow()} for details.
+   */
+  @Deprecated
+  public static Trigger Once() {
+    return OneTimeTrigger$.MODULE$;
+  }
+
+  /**
+   * A trigger that processes all available data at the start of the query in 
one or multiple
+   * batches, then terminates the query.
+   *
+   * Users are encouraged to set the source options to control the size of the 
batch as similar as
+   * controlling the size of the batch in {@link #ProcessingTime(long)} 
trigger.
+   *
+   * NOTES:
+   * - This trigger provides a strong guarantee of processing: regardless of 
how many batches were
+   *   left over in previous run, it ensures all available data at the time of 
execution gets
+   *   processed before termination. All uncommitted batches will be processed 
first.
+   * - Watermark gets advanced per each batch, and no-data batch gets executed 
before termination
+   *   if the last batch advances the watermark. This helps to maintain 
smaller and predictable
+   *   state size and smaller latency on the output of stateful operators.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger AvailableNow() {
+    return AvailableNowTrigger$.MODULE$;
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger Continuous(long intervalMs) {
+    return ContinuousTrigger.apply(intervalMs);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *    import java.util.concurrent.TimeUnit
+   *    df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
+    return ContinuousTrigger.create(interval, timeUnit);
+  }
+
+  /**
+   * (Scala-friendly)
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.Continuous(10.seconds))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger Continuous(Duration interval) {
+    return ContinuousTrigger.apply(interval);
+  }
+
+  /**
+   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
+   * the specified interval.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.Continuous("10 seconds"))
+   * }}}
+   * @since 3.5.0
+   */
+  public static Trigger Continuous(String interval) {
+    return ContinuousTrigger.apply(interval);
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 023379c00a5..97914f660cf 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.connect.client.{SparkResult, 
UdfUtils}
 import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
StorageLevelProtoConverter}
 import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
 import org.apache.spark.sql.functions.{struct, to_json}
+import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types.{Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -2935,6 +2936,16 @@ class Dataset[T] private[sql] (
     new DataFrameWriterV2[T](table, this)
   }
 
+  /**
+   * Interface for saving the content of the streaming Dataset out into 
external storage.
+   *
+   * @group basic
+   * @since 3.5.0
+   */
+  def writeStream: DataStreamWriter[T] = {
+    new DataStreamWriter[T](this)
+  }
+
   /**
    * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
    *
@@ -3017,8 +3028,37 @@ class Dataset[T] private[sql] (
         .getStorageLevel)
   }
 
+  /**
+   * Defines an event time watermark for this [[Dataset]]. A watermark tracks 
a point in time
+   * before which we assume no more late data is going to arrive.
+   *
+   * Spark will use this watermark for several purposes: <ul> <li>To know when 
a given time window
+   * aggregation can be finalized and thus can be emitted when using output 
modes that do not
+   * allow updates.</li> <li>To minimize the amount of state that we need to 
keep for on-going
+   * aggregations, `mapGroupsWithState` and `dropDuplicates` operators.</li> 
</ul> The current
+   * watermark is computed by looking at the `MAX(eventTime)` seen across all 
of the partitions in
+   * the query minus a user specified `delayThreshold`. Due to the cost of 
coordinating this value
+   * across partitions, the actual watermark used is only guaranteed to be at 
least
+   * `delayThreshold` behind the actual event time. In some cases we may still 
process records
+   * that arrive more than `delayThreshold` late.
+   *
+   * @param eventTime
+   *   the name of the column that contains the event time of the row.
+   * @param delayThreshold
+   *   the minimum delay to wait to data to arrive late, relative to the 
latest record that has
+   *   been processed in the form of an interval (e.g. "1 minute" or "5 
hours"). NOTE: This should
+   *   not be negative.
+   *
+   * @group streaming
+   * @since 3.5.0
+   */
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = {
-    throw new UnsupportedOperationException("withWatermark is not 
implemented.")
+    sparkSession.newDataset(encoder) { builder =>
+      builder.getWithWatermarkBuilder
+        .setInput(plan.getRoot)
+        .setEventTime(eventTime)
+        .setDelayThreshold(delayThreshold)
+    }
   }
 
   def observe(name: String, expr: Column, exprs: Column*): Dataset[T] = {
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index d68988cd435..00910d5904a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -28,6 +28,7 @@ import org.apache.arrow.memory.RootAllocator
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalog.Catalog
 import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
@@ -37,6 +38,7 @@ import org.apache.spark.sql.connect.client.{ClassFinder, 
SparkConnectClient, Spa
 import org.apache.spark.sql.connect.client.util.{Cleaner, ConvertToArrow}
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
 import org.apache.spark.sql.internal.CatalogImpl
+import org.apache.spark.sql.streaming.DataStreamReader
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -287,6 +289,17 @@ class SparkSession private[sql] (
    */
   def read: DataFrameReader = new DataFrameReader(this)
 
+  /**
+   * Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
+   * {{{
+   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
+   *   
sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  def readStream: DataStreamReader = new DataStreamReader(this)
+
   /**
    * Interface through which the user may create, drop, alter or query 
underlying databases,
    * tables, functions etc.
@@ -453,9 +466,9 @@ class SparkSession private[sql] (
     client.execute(plan).asScala.foreach(_ => ())
   }
 
-  private[sql] def execute(command: proto.Command): Unit = {
+  private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = 
{
     val plan = proto.Plan.newBuilder().setCommand(command).build()
-    client.execute(plan).asScala.foreach(_ => ())
+    client.execute(plan).asScala.toSeq
   }
 
   @DeveloperApi
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
new file mode 100644
index 00000000000..be1b0e8ac0c
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis
+import org.apache.spark.sql.catalyst.util.IntervalUtils
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.UTF8String
+
+private object Triggers {
+  // This is a copy of the same class in 
sql/core/...execution/streaming/Triggers.scala
+
+  def validate(intervalMs: Long): Unit = {
+    require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+    val cal = IntervalUtils.stringToInterval(UTF8String.fromString(interval))
+    if (cal.months != 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    val microsInDays = Math.multiplyExact(cal.days, MICROS_PER_DAY)
+    microsToMillis(Math.addExact(cal.microseconds, microsInDays))
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
+
+/**
+ * A [[Trigger]] that processes all available data in one batch then 
terminates the query.
+ */
+case object OneTimeTrigger extends Trigger
+
+/**
+ * A [[Trigger]] that processes all available data in multiple batches then 
terminates the query.
+ */
+case object AvailableNowTrigger extends Trigger
+
+/**
+ * A [[Trigger]] that runs a query periodically based on the processing time. 
If `interval` is 0,
+ * the query will run as fast as possible.
+ */
+case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
+  Triggers.validate(intervalMs)
+}
+
+object ProcessingTimeTrigger {
+  import Triggers._
+
+  def apply(interval: String): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval))
+  }
+
+  def apply(interval: Duration): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval))
+  }
+
+  def create(interval: String): ProcessingTimeTrigger = {
+    apply(interval)
+  }
+
+  def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval, unit))
+  }
+}
+
+/**
+ * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at the
+ * specified interval.
+ */
+case class ContinuousTrigger(intervalMs: Long) extends Trigger {
+  Triggers.validate(intervalMs)
+}
+
+object ContinuousTrigger {
+  import Triggers._
+
+  def apply(interval: String): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval))
+  }
+
+  def apply(interval: Duration): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval))
+  }
+
+  def create(interval: String): ContinuousTrigger = {
+    apply(interval)
+  }
+
+  def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval, unit))
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
new file mode 100644
index 00000000000..f6f41257417
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Read.DataSource
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `SparkSession.readStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamReader private[sql] (sparkSession: SparkSession) extends 
Logging {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamReader = {
+    sourceBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schema: StructType): DataStreamReader = {
+    if (schema != null) {
+      sourceBuilder.setSchema(schema.json) // Use json. DDL does not retail 
all the attributes.
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the 
schema here, the
+   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
+   *
+   * @since 3.5.0
+   */
+  def schema(schemaString: String): DataStreamReader = {
+    sourceBuilder.setSchema(schemaString)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamReader = {
+    sourceBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataStreamReader 
= {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * (Java-specific) Adds input options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+    sourceBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path (e.g.
+   * external key-value stores).
+   *
+   * @since 3.5.0
+   */
+  def load(): DataFrame = {
+    sparkSession.newDataFrame { relationBuilder =>
+      relationBuilder.getReadBuilder
+        .setIsStreaming(true)
+        .setDataSource(sourceBuilder.build())
+    }
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data streams that read from some 
path.
+   *
+   * @since 3.5.0
+   */
+  def load(path: String): DataFrame = {
+    sourceBuilder.clearPaths()
+    sourceBuilder.addPaths(path)
+    load()
+  }
+
+  /**
+   * Loads a JSON file stream and returns the results as a `DataFrame`.
+   *
+   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
+   * default. For JSON (one record per file), set the `multiLine` option to 
true.
+   *
+   * This function goes through the input once to determine the input schema. 
If you know the
+   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the JSON-specific options for reading JSON file stream in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def json(path: String): DataFrame = {
+    format("json").load(path)
+  }
+
+  /**
+   * Loads a CSV file stream and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input 
schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the CSV-specific options for reading CSV file stream in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def csv(path: String): DataFrame = format("csv").load(path)
+
+  /**
+   * Loads a ORC file stream, returning the result as a `DataFrame`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * ORC-specific option(s) for reading ORC file stream can be found in <a 
href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def orc(path: String): DataFrame = format("orc").load(path)
+
+  /**
+   * Loads a Parquet file stream, returning the result as a `DataFrame`.
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * Parquet-specific option(s) for reading Parquet file stream can be found 
in <a href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def parquet(path: String): DataFrame = format("parquet").load(path)
+
+  /**
+   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
+   * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
+   * as UTF-8.
+   *
+   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
+   * {{{
+   *   // Scala:
+   *   spark.readStream.text("/path/to/directory/")
+   *
+   *   // Java:
+   *   spark.readStream().text("/path/to/directory/")
+   * }}}
+   *
+   * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
+   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   *
+   * You can find the text-specific options for reading text files in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def text(path: String): DataFrame = format("text").load(path)
+
+  /**
+   * Loads text file(s) and returns a `Dataset` of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value". The text files must be 
encoded as UTF-8.
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
+   *
+   * By default, each line in the text file is a new element in the resulting 
Dataset. For
+   * example:
+   * {{{
+   *   // Scala:
+   *   spark.readStream.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.readStream().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * You can set the text-specific options as specified in 
`DataStreamReader.text`.
+   *
+   * @param path
+   *   input path
+   * @since 3.5.0
+   */
+  def textFile(path: String): Dataset[String] = {
+    text(path).select("value").as[String](StringEncoder)
+  }
+
+  private val sourceBuilder = DataSource.newBuilder()
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
new file mode 100644
index 00000000000..32b4aa623bb
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.Locale
+import java.util.concurrent.TimeoutException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.connect.proto.WriteStreamOperationStart
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
+
+/**
+ * Interface used to write a streaming `Dataset` to external storage systems 
(e.g. file systems,
+ * key-value stores, etc). Use `Dataset.writeStream` to access this.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
+
+  /**
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink. <ul> <li>
+   * `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be written
+   * to the sink.</li> <li> `OutputMode.Complete()`: all the rows in the 
streaming
+   * DataFrame/Dataset will be written to the sink every time there are some 
updates.</li> <li>
+   * `OutputMode.Update()`: only the rows that were updated in the streaming 
DataFrame/Dataset
+   * will be written to the sink every time there are some updates. If the 
query doesn't contain
+   * aggregations, it will be equivalent to `OutputMode.Append()` mode.</li> 
</ul>
+   *
+   * @since 3.5.0
+   */
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+    sinkBuilder.setOutputMode(outputMode.toString.toLowerCase(Locale.ROOT))
+    this
+  }
+
+  /**
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink. <ul> <li>
+   * `append`: only the new rows in the streaming DataFrame/Dataset will be 
written to the
+   * sink.</li> <li> `complete`: all the rows in the streaming 
DataFrame/Dataset will be written
+   * to the sink every time there are some updates.</li> <li> `update`: only 
the rows that were
+   * updated in the streaming DataFrame/Dataset will be written to the sink 
every time there are
+   * some updates. If the query doesn't contain aggregations, it will be 
equivalent to `append`
+   * mode.</li> </ul>
+   *
+   * @since 3.5.0
+   */
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+    sinkBuilder.setOutputMode(outputMode)
+    this
+  }
+
+  /**
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will
+   * run the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.writeStream.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.writeStream.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.writeStream().trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+    trigger match {
+      case ProcessingTimeTrigger(intervalMs) =>
+        sinkBuilder.setProcessingTimeInterval(s"$intervalMs milliseconds")
+      case AvailableNowTrigger =>
+        sinkBuilder.setAvailableNow(true)
+      case OneTimeTrigger =>
+        sinkBuilder.setOnce(true)
+      case ContinuousTrigger(intervalMs) =>
+        sinkBuilder.setContinuousCheckpointInterval(s"$intervalMs 
milliseconds")
+    }
+    this
+  }
+
+  /**
+   * Specifies the name of the [[StreamingQuery]] that can be started with 
`start()`. This name
+   * must be unique among all the currently active queries in the associated 
SQLContext.
+   *
+   * @since 3.5.0
+   */
+  def queryName(queryName: String): DataStreamWriter[T] = {
+    sinkBuilder.setQueryName(queryName)
+    this
+  }
+
+  /**
+   * Specifies the underlying output data source.
+   *
+   * @since 3.5.0
+   */
+  def format(source: String): DataStreamWriter[T] = {
+    sinkBuilder.setFormat(source)
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If 
specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme. As an 
example, when we
+   * partition a dataset by year and then month, the directory layout would 
look like:
+   *
+   * <ul> <li> year=2016/month=01/</li> <li> year=2016/month=02/</li> </ul>
+   *
+   * Partitioning is one of the most widely used techniques to optimize 
physical data layout. It
+   * provides a coarse-grained index for skipping unnecessary data reads when 
queries have
+   * predicates on the partitioned columns. In order for partitioning to work 
well, the number of
+   * distinct values in each column should typically be less than tens of 
thousands.
+   *
+   * @since 3.5.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataStreamWriter[T] = {
+    sinkBuilder.clearPartitioningColumnNames()
+    sinkBuilder.addAllPartitioningColumnNames(colNames.asJava)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: String): DataStreamWriter[T] = {
+    sinkBuilder.putOptions(key, value)
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Boolean): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Long): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def option(key: String, value: Double): DataStreamWriter[T] = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: scala.collection.Map[String, String]): 
DataStreamWriter[T] = {
+    this.options(options.asJava)
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * @since 3.5.0
+   */
+  def options(options: java.util.Map[String, String]): DataStreamWriter[T] = {
+    sinkBuilder.putAllOptions(options)
+    this
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the
+   * given path as new data arrives. The returned [[StreamingQuery]] object 
can be used to
+   * interact with the stream.
+   *
+   * @since 3.5.0
+   */
+  def start(path: String): StreamingQuery = {
+    sinkBuilder.setPath(path)
+    start()
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the
+   * given path as new data arrives. The returned [[StreamingQuery]] object 
can be used to
+   * interact with the stream. Throws a `TimeoutException` if the following 
conditions are met:
+   *   - Another run of the same streaming query, that is a streaming query 
sharing the same
+   *     checkpoint location, is already active on the same Spark Driver
+   *   - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is 
enabled
+   *   - The active run cannot be stopped within the timeout controlled by the 
SQL configuration
+   *     `spark.sql.streaming.stopTimeout`
+   *
+   * @since 3.5.0
+   */
+  @throws[TimeoutException]
+  def start(): StreamingQuery = {
+    val startCmd = Command
+      .newBuilder()
+      .setWriteStreamOperationStart(sinkBuilder.build())
+      .build()
+
+    val resp = ds.sparkSession.execute(startCmd).head
+    RemoteStreamingQuery.fromStartCommandResponse(ds.sparkSession, resp)
+  }
+
+  /**
+   * Starts the execution of the streaming query, which will continually 
output results to the
+   * given table as new data arrives. The returned [[StreamingQuery]] object 
can be used to
+   * interact with the stream.
+   *
+   * For v1 table, partitioning columns provided by `partitionBy` will be 
respected no matter the
+   * table exists or not. A new table will be created if the table not exists.
+   *
+   * For v2 table, `partitionBy` will be ignored if the table already exists. 
`partitionBy` will
+   * be respected only if the v2 table does not exist. Besides, the v2 table 
created by this API
+   * lacks some functionalities (e.g., customized properties, options, and 
serde info). If you
+   * need them, please create the v2 table manually before the execution to 
avoid creating a table
+   * with incomplete information.
+   *
+   * @since 3.5.0
+   */
+  @Evolving
+  @throws[TimeoutException]
+  def toTable(tableName: String): StreamingQuery = {
+    sinkBuilder.setTableName(tableName)
+    start()
+  }
+
+  private val sinkBuilder = WriteStreamOperationStart
+    .newBuilder()
+    .setInput(ds.plan.getRoot)
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
new file mode 100644
index 00000000000..8bb35382162
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+import java.util.concurrent.TimeoutException
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.connect.proto.Command
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.connect.proto.StreamingQueryCommand
+import org.apache.spark.connect.proto.StreamingQueryCommandResult
+import org.apache.spark.sql.SparkSession
+
+/**
+ * A handle to a query that is executing continuously in the background as new 
data arrives. All
+ * these methods are thread-safe.
+ * @since 3.5.0
+ */
+@Evolving
+trait StreamingQuery {
+  // This is a copy of StreamingQuery in 
sql/core/.../streaming/StreamingQuery.scala
+
+  /**
+   * Returns the user-specified name of the query, or null if not specified. 
This name can be
+   * specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
+   * `dataframe.writeStream.queryName("query").start()`. This name, if set, 
must be unique across
+   * all active queries.
+   *
+   * @since 3.5.0
+   */
+  def name: String
+
+  /**
+   * Returns the unique id of this query that persists across restarts from 
checkpoint data. That
+   * is, this id is generated when a query is started for the first time, and 
will be the same
+   * every time it is restarted from checkpoint data. Also see [[runId]].
+   *
+   * @since 3.5.0
+   */
+  def id: UUID
+
+  /**
+   * Returns the unique id of this run of the query. That is, every 
start/restart of a query will
+   * generate a unique runId. Therefore, every time a query is restarted from 
checkpoint, it will
+   * have the same [[id]] but different [[runId]]s.
+   */
+  def runId: UUID
+
+  /**
+   * Returns the `SparkSession` associated with `this`.
+   *
+   * @since 3.5.0
+   */
+  def sparkSession: SparkSession
+
+  /**
+   * Returns `true` if this query is actively running.
+   *
+   * @since 3.5.0
+   */
+  def isActive: Boolean
+
+  /**
+   * Returns the current status of the query.
+   *
+   * @since 3.5.0
+   */
+  def status: StreamingQueryStatus
+
+  /**
+   * Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query. The
+   * number of progress updates retained for each stream is configured by 
Spark session
+   * configuration `spark.sql.streaming.numRecentProgressUpdates`.
+   *
+   * @since 3.5.0
+   */
+  def recentProgress: Array[StreamingQueryProgress]
+
+  /**
+   * Returns the most recent [[StreamingQueryProgress]] update of this 
streaming query.
+   *
+   * @since 3.5.0
+   */
+  def lastProgress: StreamingQueryProgress
+
+  /**
+   * Blocks until all available data in the source has been processed and 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever. Additionally, this method is only guaranteed to 
block until data
+   * that has been synchronously appended data to a
+   * `org.apache.spark.sql.execution.streaming.Source` prior to invocation. 
(i.e. `getOffset` must
+   * immediately reflect the addition).
+   * @since 3.5.0
+   */
+  def processAllAvailable(): Unit
+
+  /**
+   * Stops the execution of this query if it is running. This waits until the 
termination of the
+   * query execution threads or until a timeout is hit.
+   *
+   * By default stop will block indefinitely. You can configure a timeout by 
the configuration
+   * `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) 
milliseconds will block
+   * indefinitely. If a `TimeoutException` is thrown, users can retry stopping 
the stream. If the
+   * issue persists, it is advisable to kill the Spark application.
+   *
+   * @since 3.5.0
+   */
+  @throws[TimeoutException]
+  def stop(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 3.5.0
+   */
+  def explain(): Unit
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   *
+   * @param extended
+   *   whether to do extended explain or not
+   * @since 3.5.0
+   */
+  def explain(extended: Boolean): Unit
+}
+
+class RemoteStreamingQuery(
+    override val id: UUID,
+    override val runId: UUID,
+    override val name: String,
+    override val sparkSession: SparkSession)
+    extends StreamingQuery {
+
+  override def isActive: Boolean = {
+    executeQueryCmd(_.setStatus(true)).getStatus.getIsActive
+  }
+
+  override def status: StreamingQueryStatus = {
+    val statusResp = executeQueryCmd(_.setStatus(true)).getStatus
+    new StreamingQueryStatus(
+      message = statusResp.getStatusMessage,
+      isDataAvailable = statusResp.getIsDataAvailable,
+      isTriggerActive = statusResp.getIsTriggerActive)
+  }
+
+  override def recentProgress: Array[StreamingQueryProgress] = {
+    
executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala
+      .map(json => new StreamingQueryProgress(json))
+      .toArray
+  }
+
+  override def lastProgress: StreamingQueryProgress = {
+    executeQueryCmd(
+      
_.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption
+      .map(json => new StreamingQueryProgress(json))
+      .orNull
+  }
+
+  override def processAllAvailable(): Unit = {
+    executeQueryCmd(_.setProcessAllAvailable(true))
+  }
+
+  override def stop(): Unit = {
+    executeQueryCmd(_.setStop(true))
+  }
+
+  override def explain(): Unit = {
+    explain(extended = false)
+  }
+
+  override def explain(extended: Boolean): Unit = {
+    val explainCmd = StreamingQueryCommand.ExplainCommand
+      .newBuilder()
+      .setExtended(extended)
+      .build()
+
+    val explain = 
executeQueryCmd(_.setExplain(explainCmd)).getExplain.getResult
+
+    // scalastyle:off println
+    println(explain)
+    // scalastyle:on println
+  }
+
+  private def executeQueryCmd(
+      setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command 
field, like stop().
+  ): StreamingQueryCommandResult = {
+
+    val cmdBuilder = Command.newBuilder()
+    val queryCmdBuilder = cmdBuilder.getStreamingQueryCommandBuilder
+
+    // Set queryId.
+    queryCmdBuilder.getQueryIdBuilder
+      .setId(id.toString)
+      .setRunId(runId.toString)
+
+    // Set command.
+    setCmdFn(queryCmdBuilder)
+
+    val resp = sparkSession.execute(cmdBuilder.build()).head
+
+    if (!resp.hasStreamingQueryCommandResult) {
+      throw new RuntimeException("Unexpected missing response for streaming 
query command")
+    }
+
+    resp.getStreamingQueryCommandResult
+  }
+}
+
+object RemoteStreamingQuery {
+
+  def fromStartCommandResponse(
+      sparkSession: SparkSession,
+      response: ExecutePlanResponse): RemoteStreamingQuery = {
+
+    if (!response.hasWriteStreamOperationStartResult) {
+      throw new RuntimeException("Unexpected: No result in response for start 
stream command")
+    }
+
+    val result = response.getWriteStreamOperationStartResult
+
+    new RemoteStreamingQuery(
+      id = UUID.fromString(result.getQueryId.getId),
+      runId = UUID.fromString(result.getQueryId.getRunId),
+      name = if (result.getName.isEmpty) null else result.getName,
+      sparkSession = sparkSession)
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
new file mode 100644
index 00000000000..cdda25876b2
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Reports information about the instantaneous status of a streaming query.
+ *
+ * @param message
+ *   A human readable description of what the stream is currently doing.
+ * @param isDataAvailable
+ *   True when there is new data to be processed. Doesn't apply to 
ContinuousExecution where it is
+ *   always false.
+ * @param isTriggerActive
+ *   True when the trigger is actively firing, false when waiting for the next 
trigger time.
+ *   Doesn't apply to ContinuousExecution where it is always false.
+ *
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryStatus protected[sql] (
+    val message: String,
+    val isDataAvailable: Boolean,
+    val isTriggerActive: Boolean)
+    extends Serializable {
+  // This is a copy of the same class in 
sql/core/.../streaming/StreamingQueryStatus.scala
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def copy(
+      message: String = this.message,
+      isDataAvailable: Boolean = this.isDataAvailable,
+      isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = 
{
+    new StreamingQueryStatus(
+      message = message,
+      isDataAvailable = isDataAvailable,
+      isTriggerActive = isTriggerActive)
+  }
+
+  private[sql] def jsonValue: JValue = {
+    ("message" -> JString(message)) ~
+      ("isDataAvailable" -> JBool(isDataAvailable)) ~
+      ("isTriggerActive" -> JBool(isTriggerActive))
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 00000000000..974bcd64b29
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+class StreamingQueryProgress private[sql] (val json: String) {
+  // TODO(SPARK-43128): (Implement full object by parsing from json).
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 4edea00cd51..7520588580a 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -126,7 +126,12 @@ object CheckConnectJvmClientCompatibility {
       IncludeByName("org.apache.spark.sql.RuntimeConfig.*"),
       IncludeByName("org.apache.spark.sql.TypedColumn.*"),
       IncludeByName("org.apache.spark.sql.SQLImplicits.*"),
-      IncludeByName("org.apache.spark.sql.DatasetHolder.*"))
+      IncludeByName("org.apache.spark.sql.DatasetHolder.*"),
+      IncludeByName("org.apache.spark.sql.streaming.DataStreamReader.*"),
+      IncludeByName("org.apache.spark.sql.streaming.DataStreamWriter.*"),
+      IncludeByName("org.apache.spark.sql.streaming.StreamingQuery.*"),
+      IncludeByName("org.apache.spark.sql.streaming.StreamingQueryStatus.*"),
+      IncludeByName("org.apache.spark.sql.streaming.StreamingQueryProgress.*"))
     val excludeRules = Seq(
       // Filter unsupported rules:
       // Note when muting errors for a method, checks on all overloading 
methods are also muted.
@@ -164,7 +169,6 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"),
 
       // functions
@@ -197,7 +201,6 @@ object CheckConnectJvmClientCompatibility {
         "org.apache.spark.sql.SparkSession.baseRelationToDataFrame"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.createDataset"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
 
       // RuntimeConfig
@@ -206,6 +209,33 @@ object CheckConnectJvmClientCompatibility {
       // TypedColumn
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.TypedColumn.this"),
 
+      // DataStreamReader
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.DataStreamReader.table" // TODO( 
SPARK-43144)
+      ),
+
+      // DataStreamWriter
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreach" // 
TODO(SPARK-43133)
+      ),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.DataStreamWriter.foreachBatch" // 
TODO(SPARK-42944)
+      ),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are 
constant vals.
+      ),
+
+      // StreamingQuery
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" // 
TODO(SPARK-43143)
+      ),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.StreamingQuery.exception" // 
TODO(SPARK-43134)
+      ),
+      ProblemFilters.exclude[Problem](
+        "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // 
TODO(SPARK-43128)
+      ),
+
       // SQLImplicits
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
new file mode 100644
index 00000000000..3659c0a5157
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.SQLHelper
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.window
+
+class StreamingQuerySuite extends RemoteSparkSession with SQLHelper {
+
+  test("Streaming API with windowed aggregate query") {
+    // This verifies standard streaming API by starting a streaming query with 
windowed count.
+    withSQLConf(
+      "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers.
+    ) {
+      val readDF = spark.readStream
+        .format("rate")
+        .option("rowsPerSecond", "10")
+        .option("numPartitions", "1")
+        .load()
+
+      // Verify schema (results in sending an RPC)
+      assert(readDF.schema.toDDL == "timestamp TIMESTAMP,value BIGINT")
+
+      val countsDF = readDF
+        .withWatermark("timestamp", "10 seconds")
+        .groupBy(window(col("timestamp"), "5 seconds"))
+        .count()
+        .selectExpr("window.start as timestamp", "count as num_events")
+
+      assert(countsDF.schema.toDDL == "timestamp TIMESTAMP,num_events BIGINT 
NOT NULL")
+
+      // Start the query
+      val queryName = "sparkConnectStreamingQuery"
+
+      val query = countsDF.writeStream
+        .format("memory")
+        .queryName(queryName)
+        .trigger(Trigger.ProcessingTime("1 second"))
+        .start()
+
+      try {
+        // Verify some of the API.
+        assert(query.isActive)
+
+        eventually(timeout(10.seconds)) {
+          assert(query.status.isDataAvailable)
+          assert(query.recentProgress.nonEmpty) // Query made progress.
+        }
+
+        query.explain() // Prints the plan to console.
+        // Consider verifying explain output by capturing stdout similar to
+        // test("Dataset explain") in ClientE2ETestSuite.
+
+      } finally {
+        // Don't wait for any processed data. Otherwise the test could take 
multiple seconds.
+        query.stop()
+      }
+    }
+  }
+}
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index e2f5645a9c2..8929fb6224d 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -58,4 +58,6 @@
               
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
     <suppress checks="MethodName"
               
files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
+    <suppress checks="MethodName"
+              
files="connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
 </suppressions>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to