This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 95a59223ce7 [SPARK-44535][CONNECT][SQL] Move required Streaming API to 
sql/api
95a59223ce7 is described below

commit 95a59223ce79174c9a605a4305082a1211ccb7e8
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Tue Jul 25 23:57:14 2023 -0400

    [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api
    
    ### What changes were proposed in this pull request?
    This PR moves a bunch streaming classed to the SQL/API project.
    
    ### Why are the changes needed?
    This is needed to disconnect the Spark Connect Scala Client from catalyst.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing Tests.
    
    Closes #42140 from hvanhovell/SPARK-44535.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 03d0ecc68365565e921ac6bc0f253ee67d2042d7)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/streaming/DataStreamWriter.scala     |  3 +--
 .../sql/streaming/StreamingQueryListener.scala     |  3 +--
 .../sql/streaming/StreamingQueryManager.scala      |  6 ++---
 .../CheckConnectJvmClientCompatibility.scala       | 18 +++++++++++++++
 dev/checkstyle-suppressions.xml                    |  4 ++--
 project/MimaExcludes.scala                         |  5 +++-
 .../spark/sql/streaming/GroupStateTimeout.java     |  0
 .../org/apache/spark/sql/streaming/OutputMode.java |  0
 .../expressions/{GenericRow.scala => rows.scala}   | 10 ++++++++
 .../logical/groupStateTimeouts.scala}              | 27 +++++-----------------
 .../catalyst/streaming/InternalOutputModes.scala   |  0
 .../spark/sql/catalyst/expressions/rows.scala      |  9 --------
 .../spark/sql/catalyst/plans/logical/object.scala  |  5 ----
 13 files changed, 45 insertions(+), 45 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index ad76ab4a1bc..b395a2d073d 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.streaming.OneTimeTrigger
 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
 import org.apache.spark.sql.types.NullType
 import org.apache.spark.util.SparkSerDeUtils
-import org.apache.spark.util.Utils
 
 /**
  * Interface used to write a streaming `Dataset` to external storage systems 
(e.g. file systems,
@@ -240,7 +239,7 @@ final class DataStreamWriter[T] private[sql] (ds: 
Dataset[T]) extends Logging {
    */
   @Evolving
   def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] 
= {
-    val serializedFn = Utils.serialize(function)
+    val serializedFn = SparkSerDeUtils.serialize(function)
     sinkBuilder.getForeachBatchBuilder.getScalaFunctionBuilder
       .setPayload(ByteString.copyFrom(serializedFn))
       .setOutputType(DataTypeProtoConverter.toConnectProtoType(NullType)) // 
Unused.
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 8cef421becd..e2f3be02ad3 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -25,7 +25,6 @@ import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc}
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.annotation.Evolving
-import org.apache.spark.scheduler.SparkListenerEvent
 
 /**
  * Interface for listening to events related to [[StreamingQuery 
StreamingQueries]].
@@ -116,7 +115,7 @@ object StreamingQueryListener extends Serializable {
    * @since 3.5.0
    */
   @Evolving
-  trait Event extends SparkListenerEvent
+  trait Event
 
   /**
    * Event representing the start of a query
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 8f9e768d23f..91744460440 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.connect.proto.StreamingQueryManagerCommandResult
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connect.common.{InvalidPlanInput, 
StreamingListenerPacket}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.SparkSerDeUtils
 
 /**
  * A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -155,7 +155,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
     cacheListenerById(id, listener)
     executeManagerCmd(
       _.getAddListenerBuilder
-        .setListenerPayload(ByteString.copyFrom(Utils
+        .setListenerPayload(ByteString.copyFrom(SparkSerDeUtils
           .serialize(StreamingListenerPacket(id, listener)))))
   }
 
@@ -168,7 +168,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
     val id = getIdByListener(listener)
     executeManagerCmd(
       _.getRemoveListenerBuilder
-        .setListenerPayload(ByteString.copyFrom(Utils
+        .setListenerPayload(ByteString.copyFrom(SparkSerDeUtils
           .serialize(StreamingListenerPacket(id, listener)))))
     removeCachedListener(id)
   }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index deb2ff631fd..08028f26eb4 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -280,6 +280,24 @@ object CheckConnectJvmClientCompatibility {
         "org.apache.spark.sql.streaming.PythonStreamingQueryListener"),
       ProblemFilters.exclude[MissingClassProblem](
         "org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"),
+      ProblemFilters.exclude[MissingTypesProblem](
+        "org.apache.spark.sql.streaming.StreamingQueryListener$Event"),
+      ProblemFilters.exclude[MissingTypesProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryIdleEvent"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryIdleEvent.logEvent"),
+      ProblemFilters.exclude[MissingTypesProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgressEvent.logEvent"),
+      ProblemFilters.exclude[MissingTypesProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.logEvent"),
+      ProblemFilters.exclude[MissingTypesProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminatedEvent.logEvent"),
 
       // SQLImplicits
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index 8929fb6224d..17677bee7fe 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -53,9 +53,9 @@
     <suppress checks="MethodName"
               
files="src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java"/>
     <suppress checks="MethodName"
-              
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/>
+              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/>
     <suppress checks="MethodName"
-              
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
+              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
     <suppress checks="MethodName"
               
files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
     <suppress checks="MethodName"
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3b4f0796ec1..6d527610231 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -60,7 +60,10 @@ object MimaExcludes {
     // [SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query 
cancellation by tag
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"),
     // [SPARK-44205][SQL] Extract Catalyst Code from DecimalType
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply"),
+    // [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupStateTimeout"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode")
   )
 
   // Defulat exclude rules
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
 b/sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
similarity index 100%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java 
b/sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
similarity index 100%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
rename to sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
 b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
similarity index 82%
copy from 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
copy to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index f4b45f5928e..fbd4c1d9837 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
 
 /**
  * A row implementation that uses an array of objects as the underlying 
storage.  Note that, while
@@ -37,3 +38,12 @@ class GenericRow(protected[sql] val values: Array[Any]) 
extends Row {
 
   override def copy(): GenericRow = this
 }
+
+class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
+  extends GenericRow(values) {
+
+  /** No-arg constructor for serialization. */
+  protected def this() = this(null, null)
+
+  override def fieldIndex(name: String): Int = schema.fieldIndex(name)
+}
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala
similarity index 54%
rename from 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala
index f4b45f5928e..785732e9833 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala
@@ -14,26 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.catalyst.expressions
+package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.streaming.GroupStateTimeout
 
-/**
- * A row implementation that uses an array of objects as the underlying 
storage.  Note that, while
- * the array is not copied, and thus could technically be mutated after 
creation, this is not
- * allowed.
- */
-class GenericRow(protected[sql] val values: Array[Any]) extends Row {
-  /** No-arg constructor for serialization. */
-  protected def this() = this(null)
-
-  def this(size: Int) = this(new Array[Any](size))
-
-  override def length: Int = values.length
-
-  override def get(i: Int): Any = values(i)
-
-  override def toSeq: Seq[Any] = values.clone()
-
-  override def copy(): GenericRow = this
-}
+/** Types of timeouts used in FlatMapGroupsWithState */
+case object NoTimeout extends GroupStateTimeout
+case object ProcessingTimeTimeout extends GroupStateTimeout
+case object EventTimeTimeout extends GroupStateTimeout
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
similarity index 100%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 10c33a43270..09d78f79edd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -157,15 +157,6 @@ trait BaseGenericInternalRow extends InternalRow {
   }
 }
 
-class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
-  extends GenericRow(values) {
-
-  /** No-arg constructor for serialization. */
-  protected def this() = this(null, null)
-
-  override def fieldIndex(name: String): Int = schema.fieldIndex(name)
-}
-
 /**
  * An internal row implementation that uses an array of objects as the 
underlying storage.
  * Note that, while the array is not copied, and thus could technically be 
mutated after creation,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index d79d55bc964..35b0bd4363b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -451,11 +451,6 @@ case class MapGroups(
 /** Internal class representing State */
 trait LogicalGroupState[S]
 
-/** Types of timeouts used in FlatMapGroupsWithState */
-case object NoTimeout extends GroupStateTimeout
-case object ProcessingTimeTimeout extends GroupStateTimeout
-case object EventTimeTimeout extends GroupStateTimeout
-
 /** Factory for constructing new `MapGroupsWithState` nodes. */
 object FlatMapGroupsWithState {
   def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to