This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 95a59223ce7 [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api 95a59223ce7 is described below commit 95a59223ce79174c9a605a4305082a1211ccb7e8 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Tue Jul 25 23:57:14 2023 -0400 [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api ### What changes were proposed in this pull request? This PR moves a bunch streaming classed to the SQL/API project. ### Why are the changes needed? This is needed to disconnect the Spark Connect Scala Client from catalyst. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing Tests. Closes #42140 from hvanhovell/SPARK-44535. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 03d0ecc68365565e921ac6bc0f253ee67d2042d7) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../spark/sql/streaming/DataStreamWriter.scala | 3 +-- .../sql/streaming/StreamingQueryListener.scala | 3 +-- .../sql/streaming/StreamingQueryManager.scala | 6 ++--- .../CheckConnectJvmClientCompatibility.scala | 18 +++++++++++++++ dev/checkstyle-suppressions.xml | 4 ++-- project/MimaExcludes.scala | 5 +++- .../spark/sql/streaming/GroupStateTimeout.java | 0 .../org/apache/spark/sql/streaming/OutputMode.java | 0 .../expressions/{GenericRow.scala => rows.scala} | 10 ++++++++ .../logical/groupStateTimeouts.scala} | 27 +++++----------------- .../catalyst/streaming/InternalOutputModes.scala | 0 .../spark/sql/catalyst/expressions/rows.scala | 9 -------- .../spark/sql/catalyst/plans/logical/object.scala | 5 ---- 13 files changed, 45 insertions(+), 45 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index ad76ab4a1bc..b395a2d073d 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.streaming.OneTimeTrigger import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger import org.apache.spark.sql.types.NullType import org.apache.spark.util.SparkSerDeUtils -import org.apache.spark.util.Utils /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -240,7 +239,7 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging { */ @Evolving def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = { - val serializedFn = Utils.serialize(function) + val serializedFn = SparkSerDeUtils.serialize(function) sinkBuilder.getForeachBatchBuilder.getScalaFunctionBuilder .setPayload(ByteString.copyFrom(serializedFn)) .setOutputType(DataTypeProtoConverter.toConnectProtoType(NullType)) // Unused. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 8cef421becd..e2f3be02ad3 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -25,7 +25,6 @@ import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.annotation.Evolving -import org.apache.spark.scheduler.SparkListenerEvent /** * Interface for listening to events related to [[StreamingQuery StreamingQueries]]. @@ -116,7 +115,7 @@ object StreamingQueryListener extends Serializable { * @since 3.5.0 */ @Evolving - trait Event extends SparkListenerEvent + trait Event /** * Event representing the start of a query diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 8f9e768d23f..91744460440 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.common.{InvalidPlanInput, StreamingListenerPacket} -import org.apache.spark.util.Utils +import org.apache.spark.util.SparkSerDeUtils /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -155,7 +155,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo cacheListenerById(id, listener) executeManagerCmd( _.getAddListenerBuilder - .setListenerPayload(ByteString.copyFrom(Utils + .setListenerPayload(ByteString.copyFrom(SparkSerDeUtils .serialize(StreamingListenerPacket(id, listener))))) } @@ -168,7 +168,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo val id = getIdByListener(listener) executeManagerCmd( _.getRemoveListenerBuilder - .setListenerPayload(ByteString.copyFrom(Utils + .setListenerPayload(ByteString.copyFrom(SparkSerDeUtils .serialize(StreamingListenerPacket(id, listener))))) removeCachedListener(id) } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index deb2ff631fd..08028f26eb4 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -280,6 +280,24 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.streaming.PythonStreamingQueryListener"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$Event"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$QueryIdleEvent"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener#QueryIdleEvent.logEvent"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgressEvent.logEvent"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.logEvent"), + ProblemFilters.exclude[MissingTypesProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"), + ProblemFilters.exclude[DirectMissingMethodProblem]( + "org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminatedEvent.logEvent"), // SQLImplicits ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"), diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 8929fb6224d..17677bee7fe 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -53,9 +53,9 @@ <suppress checks="MethodName" files="src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java"/> <suppress checks="MethodName" - files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/> + files="sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/> <suppress checks="MethodName" - files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> + files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> <suppress checks="MethodName" files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> <suppress checks="MethodName" diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3b4f0796ec1..6d527610231 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -60,7 +60,10 @@ object MimaExcludes { // [SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query cancellation by tag ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"), // [SPARK-44205][SQL] Extract Catalyst Code from DecimalType - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply"), + // [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupStateTimeout"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode") ) // Defulat exclude rules diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java similarity index 100% rename from sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java rename to sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java similarity index 100% rename from sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java rename to sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala similarity index 82% copy from sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala copy to sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index f4b45f5928e..fbd4c1d9837 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType /** * A row implementation that uses an array of objects as the underlying storage. Note that, while @@ -37,3 +38,12 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { override def copy(): GenericRow = this } + +class GenericRowWithSchema(values: Array[Any], override val schema: StructType) + extends GenericRow(values) { + + /** No-arg constructor for serialization. */ + protected def this() = this(null, null) + + override def fieldIndex(name: String): Int = schema.fieldIndex(name) +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala similarity index 54% rename from sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala index f4b45f5928e..785732e9833 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/expressions/GenericRow.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/groupStateTimeouts.scala @@ -14,26 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.catalyst.expressions +package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.Row +import org.apache.spark.sql.streaming.GroupStateTimeout -/** - * A row implementation that uses an array of objects as the underlying storage. Note that, while - * the array is not copied, and thus could technically be mutated after creation, this is not - * allowed. - */ -class GenericRow(protected[sql] val values: Array[Any]) extends Row { - /** No-arg constructor for serialization. */ - protected def this() = this(null) - - def this(size: Int) = this(new Array[Any](size)) - - override def length: Int = values.length - - override def get(i: Int): Any = values(i) - - override def toSeq: Seq[Any] = values.clone() - - override def copy(): GenericRow = this -} +/** Types of timeouts used in FlatMapGroupsWithState */ +case object NoTimeout extends GroupStateTimeout +case object ProcessingTimeTimeout extends GroupStateTimeout +case object EventTimeTimeout extends GroupStateTimeout diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala rename to sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 10c33a43270..09d78f79edd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -157,15 +157,6 @@ trait BaseGenericInternalRow extends InternalRow { } } -class GenericRowWithSchema(values: Array[Any], override val schema: StructType) - extends GenericRow(values) { - - /** No-arg constructor for serialization. */ - protected def this() = this(null, null) - - override def fieldIndex(name: String): Int = schema.fieldIndex(name) -} - /** * An internal row implementation that uses an array of objects as the underlying storage. * Note that, while the array is not copied, and thus could technically be mutated after creation, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index d79d55bc964..35b0bd4363b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -451,11 +451,6 @@ case class MapGroups( /** Internal class representing State */ trait LogicalGroupState[S] -/** Types of timeouts used in FlatMapGroupsWithState */ -case object NoTimeout extends GroupStateTimeout -case object ProcessingTimeTimeout extends GroupStateTimeout -case object EventTimeTimeout extends GroupStateTimeout - /** Factory for constructing new `MapGroupsWithState` nodes. */ object FlatMapGroupsWithState { def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder]( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org