This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 72561b6af51f [SPARK-52819][SQL] Making KryoSerializationCodec
serializable to fix java.io.NotSerializableException errors in various use cases
72561b6af51f is described below
commit 72561b6af51f27c44dc5c089d39970c0b8cd652b
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Dec 17 11:12:49 2025 -0400
[SPARK-52819][SQL] Making KryoSerializationCodec serializable to fix
java.io.NotSerializableException errors in various use cases
### What changes were proposed in this pull request?
This PR makes `KryoSerializationCodec` implements `java.io.Serializable` to
avoid `java.io.NotSerializableException` exceptions when using Kryo encoder in
`Dataset.flatMapGroupsWithState` or `Aggregator.bufferEncoder`.
### Why are the changes needed?
See the description in
[SPARK-52819](https://issues.apache.org/jira/browse/SPARK-52819) as well as the
[minimal repro](https://github.com/Kontinuation/spark-4-kryo-encoder-bug). The
problems only happens when using Spark 4.0.0 but not when using Spark 3.5.5.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests were added to ensure that
1. Both kryo encoder and java serialization encoder are serializable
2. The kryo encoded aggregator buffer use case works
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51615 from Kontinuation/fix-kryo-codec-serializable.
Authored-by: Kristin Cowalcijk <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
(cherry picked from commit e3569bafd8da992e0d9687075a2255b2cd6eb779)
Signed-off-by: Herman van Hövell <[email protected]>
---
.../apache/spark/sql/catalyst/encoders/codecs.scala | 2 +-
.../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 1 +
.../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 18 ++++++++++++++++++
3 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/codecs.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/codecs.scala
index 0f2197255233..b90d9f8013d6 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/codecs.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/codecs.scala
@@ -55,7 +55,7 @@ object JavaSerializationCodec extends (() => Codec[Any,
Array[Byte]]) {
* server (driver & executors) very tricky. As a workaround a user can define
their own Codec
* which internalizes the Kryo configuration.
*/
-object KryoSerializationCodec extends (() => Codec[Any, Array[Byte]]) {
+object KryoSerializationCodec extends (() => Codec[Any, Array[Byte]]) with
Serializable {
private lazy val kryoCodecConstructor: MethodHandle = {
val cls = SparkClassUtils.classForName(
"org.apache.spark.sql.catalyst.encoders.KryoSerializationCodecImpl")
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 3d738fe985dd..0d26b390643b 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -612,6 +612,7 @@ class ExpressionEncoderSuite extends
CodegenInterpretedPlanTest with AnalysisTes
provider,
nullable = true))
.resolveAndBind()
+ assert(encoder.isInstanceOf[Serializable])
assert(encoder.schema == new StructType().add("value", BinaryType))
val toRow = encoder.createSerializer()
val fromRow = encoder.createDeserializer()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index d736e9494bd3..2e00b4a4e74d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -53,6 +53,17 @@ private case class FunctionResult(f1: String, f2: String)
private case class LocalDateInstantType(date: LocalDate, instant: Instant)
private case class TimestampInstantType(t: Timestamp, instant: Instant)
+private case class KryoEncodedBuf(value: Long)
+private case class KryoBufAggregator() extends Aggregator[Long,
KryoEncodedBuf, Long] {
+ override def zero: KryoEncodedBuf = KryoEncodedBuf(0)
+ override def reduce(b: KryoEncodedBuf, a: Long): KryoEncodedBuf =
KryoEncodedBuf(b.value + a)
+ override def merge(b1: KryoEncodedBuf, b2: KryoEncodedBuf): KryoEncodedBuf =
+ KryoEncodedBuf(b1.value + b2.value)
+ override def finish(reduction: KryoEncodedBuf): Long = reduction.value
+ override def bufferEncoder: Encoder[KryoEncodedBuf] =
Encoders.kryo[KryoEncodedBuf]
+ override def outputEncoder: Encoder[Long] = Encoders.scalaLong
+}
+
class UDFSuite extends QueryTest with SharedSparkSession {
import testImplicits._
@@ -1220,4 +1231,11 @@ class UDFSuite extends QueryTest with SharedSparkSession
{
.select(f($"c").as("f"), f($"f"))
checkAnswer(df, Seq(Row(2, 3), Row(null, null)))
}
+
+ test("SPARK-52819: Support using Kryo to encode BUF in Aggregator") {
+ val kryoBufUDAF = udaf(KryoBufAggregator())
+ val input = Seq(1L, 2L, 3L).toDF("value")
+ val result = input.select(kryoBufUDAF($"value").as("sum"))
+ checkAnswer(result, Row(6L) :: Nil)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]