This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a09b5c945235 [SPARK-53226][SPARK-52088][CORE] Make ClosureCleaner work
with Java22+
a09b5c945235 is described below
commit a09b5c94523544e6b419808b86b3d966b2e97c74
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Mar 3 08:11:52 2026 +0800
[SPARK-53226][SPARK-52088][CORE] Make ClosureCleaner work with Java22+
### What changes were proposed in this pull request?
This PR proposes to change `ClosureClenaer` to work with Java 22+.
Current `ClosureCleaner` doesn't work with Java 22. For example, the
following code fails.
```
val x = 100
sc.parallelize(1 to 10).map(v => v + x).collect
java.lang.InternalError: java.lang.IllegalAccessException: final field has
no write access: $Lambda/0x00001c0001bae838.arg$1/putField, from class
java.lang.Object (module java.base)
at
java.base/jdk.internal.reflect.MethodHandleAccessorFactory.newFieldAccessor(MethodHandleAccessorFactory.java:207)
at
java.base/jdk.internal.reflect.ReflectionFactory.newFieldAccessor(ReflectionFactory.java:144)
at
java.base/java.lang.reflect.Field.acquireOverrideFieldAccessor(Field.java:1200)
at
java.base/java.lang.reflect.Field.getOverrideFieldAccessor(Field.java:1169)
at java.base/java.lang.reflect.Field.set(Field.java:836)
at
org.apache.spark.util.ClosureCleaner$.setFieldAndIgnoreModifiers(ClosureCleaner.scala:563)
at
org.apache.spark.util.ClosureCleaner$.cleanupScalaReplClosure(ClosureCleaner.scala:431)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:256)
at
org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:39)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2844)
at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:425)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
at org.apache.spark.rdd.RDD.map(RDD.scala:424)
... 38 elided
Caused by: java.lang.IllegalAccessException: final field has no write
access: $Lambda/0x00001c0001bae838.arg$1/putField, from class java.lang.Object
(module java.base)
at
java.base/java.lang.invoke.MemberName.makeAccessException(MemberName.java:889)
at
java.base/java.lang.invoke.MethodHandles$Lookup.unreflectField(MethodHandles.java:3609)
at
java.base/java.lang.invoke.MethodHandles$Lookup.unreflectSetter(MethodHandles.java:3600)
at
java.base/java.lang.invoke.MethodHandleImpl$1.unreflectField(MethodHandleImpl.java:1619)
at
java.base/jdk.internal.reflect.MethodHandleAccessorFactory.newFieldAccessor(MethodHandleAccessorFactory.java:185)
... 52 more
```
The reason is that as of Java 22, final fields cannot be modified even if
using reflection by [JEP416](https://openjdk.org/jeps/416).
The current `ClosureCleaner` tries to modify a final field `arg$1` with a
cloned and cleaned object so this part will fail.
At first I considered two solutions:
1. Using Unsafe API
2. Using `--enable-final-field-mutation` option which is expected to be
introduced by [JEP 500](https://openjdk.org/jeps/500)
But either of them cannot resolve the issue because final fields of hidden
classes cannot be modified and lambdas created by JVM internally using
`invokedynamic` instruction are hidden classes (let's call such lambda indy
lambda).
So the PR resolves this issue by cloning indy lambdas with cleaned `arg$1`
using `LambdaMetaFactory` with the impl method from the original lambdas.
### Why are the changes needed?
To make Spark work with Java 22+.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested on my customized GA job with Java 22.
https://github.com/sarutak/spark/actions/runs/19331525569
All the failed tests are related to `datasketch`, which is a [separate
issue](https://issues.apache.org/jira/browse/SPARK-53327).
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52956 from sarutak/spark-shell-java22.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/spark/util/ClosureCleaner.scala | 163 +++++++++++++++++----
.../main/scala/org/apache/spark/SparkContext.scala | 1 -
.../apache/spark/util/SparkClosureCleaner.scala | 16 +-
.../apache/spark/util/ClosureCleanerSuite2.scala | 5 +-
.../apache/spark/sql/connect/UdfToProtoUtils.scala | 7 +-
.../org/apache/spark/streaming/StateSpec.scala | 10 +-
6 files changed, 159 insertions(+), 43 deletions(-)
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 4f4233e96e8c..b23d13c32a10 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -18,10 +18,10 @@
package org.apache.spark.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-import java.lang.invoke.{MethodHandleInfo, SerializedLambda}
+import java.lang.invoke.{LambdaMetafactory, MethodHandle, MethodHandleInfo,
MethodHandles, MethodType, SerializedLambda}
import java.lang.reflect.{Field, Modifier}
-import scala.collection.mutable.{Map, Queue, Set, Stack}
+import scala.collection.mutable.{ArrayBuffer, Map, Queue, Set, Stack}
import scala.jdk.CollectionConverters._
import org.apache.xbean.asm9.{ClassReader, ClassVisitor, Handle,
MethodVisitor, Opcodes, Type}
@@ -189,11 +189,18 @@ private[spark] object ClosureCleaner extends Logging {
* @param cleanTransitively whether to clean enclosing closures transitively
* @param accessedFields a map from a class to a set of its fields that
are accessed by
* the starting closure
+ * @return Some(cleaned closure) if the closure was cleaned, None otherwise.
+ * On the clone-based path (Java 22+ or when explicitly enabled),
the returned
+ * closure is a new lambda instance with cleaned outer references.
+ * On the legacy mutation path, the closure is mutated in place and
+ * Some with the same reference is returned.
*/
- private[spark] def clean(
- func: AnyRef,
+ private[spark] def clean[F <: AnyRef](
+ func: F,
cleanTransitively: Boolean,
- accessedFields: Map[Class[_], Set[String]]): Boolean = {
+ accessedFields: Map[Class[_], Set[String]]): Option[F] = {
+ var cleanedFunc: F = func
+
// indylambda check. Most likely to be the case with 2.12, 2.13
// so we check first
// non LMF-closures should be less frequent from now on
@@ -201,14 +208,14 @@ private[spark] object ClosureCleaner extends Logging {
if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
logDebug(s"Expected a closure; got ${func.getClass.getName}")
- return false
+ return None
}
// TODO: clean all inner closures first. This requires us to find the
inner objects.
// TODO: cache outerClasses / innerClasses / accessedFields
if (func == null) {
- return false
+ return None
}
if (maybeIndylambdaProxy.isEmpty) {
@@ -232,9 +239,9 @@ private[spark] object ClosureCleaner extends Logging {
val outerThis = if (lambdaProxy.getCapturedArgCount > 0) {
// only need to clean when there is an enclosing non-null "this"
captured by the closure
- Option(lambdaProxy.getCapturedArg(0)).getOrElse(return false)
+ Option(lambdaProxy.getCapturedArg(0)).getOrElse(return None)
} else {
- return false
+ return None
}
// clean only if enclosing "this" is something cleanable, i.e. a Scala
REPL line object or
@@ -244,22 +251,28 @@ private[spark] object ClosureCleaner extends Logging {
if (isDefinedInAmmonite(outerThis.getClass)) {
// If outerThis is a lambda, we have to clean that instead
IndylambdaScalaClosures.getSerializationProxy(outerThis).foreach { _ =>
- return clean(outerThis, cleanTransitively, accessedFields)
+ val cleanedOuterThis = clean(outerThis, cleanTransitively,
accessedFields)
+ if (cleanedOuterThis.isEmpty) {
+ return None
+ } else {
+ return Some(
+ cloneIndyLambda(func, cleanedOuterThis.get,
lambdaProxy).getOrElse(func))
+ }
}
- cleanupAmmoniteReplClosure(func, lambdaProxy, outerThis,
cleanTransitively)
+ cleanedFunc = cleanupAmmoniteReplClosure(func, lambdaProxy, outerThis,
cleanTransitively)
} else {
val isClosureDeclaredInScalaRepl =
capturingClassName.startsWith("$line") &&
capturingClassName.endsWith("$iw")
if (isClosureDeclaredInScalaRepl && outerThis.getClass.getName ==
capturingClassName) {
assert(accessedFields.isEmpty)
- cleanupScalaReplClosure(func, lambdaProxy, outerThis,
cleanTransitively)
+ cleanedFunc = cleanupScalaReplClosure(func, lambdaProxy, outerThis,
cleanTransitively)
}
}
logDebug(s" +++ indylambda closure ($implMethodName) is now cleaned +++")
}
- true
+ Some(cleanedFunc)
}
/**
@@ -395,11 +408,11 @@ private[spark] object ClosureCleaner extends Logging {
* @param outerThis lambda enclosing class
* @param cleanTransitively whether to clean enclosing closures transitively
*/
- private def cleanupScalaReplClosure(
- func: AnyRef,
+ private def cleanupScalaReplClosure[F <: AnyRef](
+ func: F,
lambdaProxy: SerializedLambda,
outerThis: AnyRef,
- cleanTransitively: Boolean): Unit = {
+ cleanTransitively: Boolean): F = {
val capturingClass = outerThis.getClass
val accessedFields: Map[Class[_], Set[String]] = Map.empty
@@ -421,13 +434,17 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s" + cloning instance of REPL class ${capturingClass.getName}")
val clonedOuterThis = cloneAndSetFields(
parent = null, outerThis, capturingClass, accessedFields)
-
- val outerField = func.getClass.getDeclaredField("arg$1")
- // SPARK-37072: When Java 17 is used and `outerField` is read-only,
- // the content of `outerField` cannot be set by reflect api directly.
- // But we can remove the `final` modifier of `outerField` before set
value
- // and reset the modifier after set value.
- setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
+ cloneIndyLambda(func, clonedOuterThis, lambdaProxy).getOrElse {
+ val outerField = func.getClass.getDeclaredField("arg$1")
+ // SPARK-37072: When Java 17 is used and `outerField` is read-only,
+ // the content of `outerField` cannot be set by reflect api directly.
+ // But we can remove the `final` modifier of `outerField` before set
value
+ // and reset the modifier after set value.
+ setFieldAndIgnoreModifiers(func, outerField, clonedOuterThis)
+ func
+ }
+ } else {
+ func
}
}
@@ -456,11 +473,11 @@ private[spark] object ClosureCleaner extends Logging {
* @param outerThis lambda enclosing class
* @param cleanTransitively whether to clean enclosing closures transitively
*/
- private def cleanupAmmoniteReplClosure(
- func: AnyRef,
+ private def cleanupAmmoniteReplClosure[F <: AnyRef](
+ func: F,
lambdaProxy: SerializedLambda,
outerThis: AnyRef,
- cleanTransitively: Boolean): Unit = {
+ cleanTransitively: Boolean): F = {
val accessedFields: Map[Class[_], Set[String]] = Map.empty
initAccessedFields(accessedFields, Seq(outerThis.getClass))
@@ -549,9 +566,12 @@ private[spark] object ClosureCleaner extends Logging {
cmdClones(outerThis.getClass)
}
- val outerField = func.getClass.getDeclaredField("arg$1")
- // update lambda capturing class reference
- setFieldAndIgnoreModifiers(func, outerField, outerThisClone)
+ cloneIndyLambda(func, outerThisClone, lambdaProxy).getOrElse {
+ val outerField = func.getClass.getDeclaredField("arg$1")
+ // update lambda capturing class reference
+ setFieldAndIgnoreModifiers(func, outerField, outerThisClone)
+ func
+ }
}
private def setFieldAndIgnoreModifiers(obj: AnyRef, field: Field, value:
AnyRef): Unit = {
@@ -595,6 +615,91 @@ private[spark] object ClosureCleaner extends Logging {
}
obj
}
+
+ private def cloneIndyLambda[F <: AnyRef](
+ indyLambda: F,
+ outerThis: AnyRef,
+ lambdaProxy: SerializedLambda): Option[F] = {
+ val javaVersion = Runtime.version().feature()
+ val useClone =
System.getProperty("spark.cloneBasedClosureCleaner.enabled") == "true" ||
+ System.getenv("SPARK_CLONE_BASED_CLOSURE_CLEANER") == "1" || javaVersion
>= 22
+
+ if (useClone) {
+ val factory = makeClonedIndyLambdaFactory(indyLambda.getClass,
lambdaProxy)
+
+ val argsBuffer = new ArrayBuffer[Object]()
+ var i = 0
+ while (i < lambdaProxy.getCapturedArgCount) {
+ val arg = lambdaProxy.getCapturedArg(i)
+ argsBuffer.append(arg)
+ i += 1
+ }
+ val clonedLambda =
+ factory.invokeWithArguments(outerThis +: argsBuffer.tail.toArray:
_*).asInstanceOf[F]
+ Some(clonedLambda)
+ } else {
+ None
+ }
+ }
+
+ private def makeClonedIndyLambdaFactory(
+ originalFuncClass: Class[_],
+ lambdaProxy: SerializedLambda): MethodHandle = {
+ val classLoader = originalFuncClass.getClassLoader
+
+ // scalastyle:off classforname
+ val fInterface = Class.forName(
+ lambdaProxy.getFunctionalInterfaceClass.replace("/", "."), false,
classLoader)
+ // scalastyle:on classforname
+ val numCapturedArgs = lambdaProxy.getCapturedArgCount
+ val implMethodType = MethodType.fromMethodDescriptorString(
+ lambdaProxy.getImplMethodSignature, classLoader)
+ val invokedMethodType = MethodType.methodType(
+ fInterface, (0 until numCapturedArgs).map(i =>
implMethodType.parameterType(i)).toArray)
+
+ // scalastyle:off classforname
+ val implClassName = lambdaProxy.getImplClass.replace("/", ".")
+ val implClass = Class.forName(implClassName, false, classLoader)
+ // scalastyle:on classforname
+ val replLookup = getFullPowerLookupFor(implClass)
+
+ val implMethodName = lambdaProxy.getImplMethodName
+ val implMethodHandle = replLookup.findStatic(implClass, implMethodName,
implMethodType)
+ val funcMethodType = MethodType.fromMethodDescriptorString(
+ lambdaProxy.getFunctionalInterfaceMethodSignature, classLoader)
+ val instantiatedMethodType = MethodType.fromMethodDescriptorString(
+ lambdaProxy.getInstantiatedMethodType, classLoader)
+
+ val callSite = LambdaMetafactory.altMetafactory(
+ replLookup,
+ lambdaProxy.getFunctionalInterfaceMethodName,
+ invokedMethodType,
+ funcMethodType,
+ implMethodHandle,
+ instantiatedMethodType,
+ LambdaMetafactory.FLAG_SERIALIZABLE)
+
+ callSite.getTarget
+ }
+
+ /**
+ * This method is used for full-power lookup for `targetClass` which is used
for cloning lambdas
+ * created at each REPL line. `targetClass` is expected the enclosing class
of a lambda.
+ * `MethodHandles.privateLookupIn(targetClass, MethodHandles.lookup())` is
not
+ * helpful for such use case because targetClass and `ClosureCleaner` which
+ * `MethodHandles.lookup()` calls are loaded into different class loaders,
and the method returns
+ * a lookup which doesn't have enough privilege to create a lambda using
+ * LambdaMetaFactory.altMetafactory.
+ */
+ private def getFullPowerLookupFor(targetClass: Class[_]):
MethodHandles.Lookup = {
+ val replLookupCtor = classOf[MethodHandles.Lookup].getDeclaredConstructor(
+ classOf[Class[_]],
+ classOf[Class[_]],
+ classOf[Int])
+ replLookupCtor.setAccessible(true)
+ // -1 means full-power.
+ replLookupCtor.newInstance(targetClass, null, -1)
+ }
}
private[spark] object IndylambdaScalaClosures extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8c92f4c10aa5..d6f8d40aa51b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2849,7 +2849,6 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean =
true): F = {
SparkClosureCleaner.clean(f, checkSerializable)
- f
}
/**
diff --git
a/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
b/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
index 44e0efb44941..57b5774ee40b 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkClosureCleaner.scala
@@ -31,19 +31,25 @@ private[spark] object SparkClosureCleaner {
* @param closure the closure to clean
* @param checkSerializable whether to verify that the closure is
serializable after cleaning
* @param cleanTransitively whether to clean enclosing closures transitively
+ * @return Cleaned closure if it was actually cleaned or the original
closure otherwise
*/
- def clean(
- closure: AnyRef,
+ def clean[F <: AnyRef](
+ closure: F,
checkSerializable: Boolean = true,
- cleanTransitively: Boolean = true): Unit = {
- if (ClosureCleaner.clean(closure, cleanTransitively, mutable.Map.empty)) {
+ cleanTransitively: Boolean = true): F = {
+ val cleanedClosureOpt = ClosureCleaner.clean(closure, cleanTransitively,
mutable.Map.empty)
+ if (cleanedClosureOpt.isDefined) {
+ val cleanedClosure = cleanedClosureOpt.get
try {
if (checkSerializable && SparkEnv.get != null) {
- SparkEnv.get.closureSerializer.newInstance().serialize(closure)
+
SparkEnv.get.closureSerializer.newInstance().serialize(cleanedClosure: AnyRef)
}
} catch {
case ex: Exception => throw new SparkException("Task not
serializable", ex)
}
+ cleanedClosure
+ } else {
+ closure
}
}
}
diff --git
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 85a89aaede95..f9825b6c9a18 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -94,13 +94,14 @@ class ClosureCleanerSuite2 extends SparkFunSuite {
// If the resulting closure is not serializable even after
// cleaning, we expect ClosureCleaner to throw a SparkException
if (serializableAfter) {
- SparkClosureCleaner.clean(closure, checkSerializable = true, transitive)
+ val cleanedClosure = SparkClosureCleaner.clean(closure,
checkSerializable = true, transitive)
+ assertSerializable(cleanedClosure, serializableAfter)
} else {
intercept[SparkException] {
SparkClosureCleaner.clean(closure, checkSerializable = true,
transitive)
}
+ assertSerializable(closure, serializableAfter)
}
- assertSerializable(closure, serializableAfter)
}
test("clean basic serializable closures") {
diff --git
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
index 486302266539..a2237367225a 100644
---
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
+++
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/UdfToProtoUtils.scala
@@ -58,8 +58,11 @@ private[sql] object UdfToProtoUtils {
function: AnyRef,
inputEncoders: Seq[AgnosticEncoder[_]],
outputEncoder: AgnosticEncoder[_]): ByteString = {
- ClosureCleaner.clean(function, cleanTransitively = true, mutable.Map.empty)
- val bytes = SparkSerDeUtils.serialize(UdfPacket(function, inputEncoders,
outputEncoder))
+ val cleanedFunction = ClosureCleaner
+ .clean(function, cleanTransitively = true, mutable.Map.empty)
+ .getOrElse(function)
+ val bytes =
+ SparkSerDeUtils.serialize(UdfPacket(cleanedFunction, inputEncoders,
outputEncoder))
checkDeserializable(bytes)
ByteString.copyFrom(bytes)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index f04b9da9b45d..52f726290a4b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -157,8 +157,9 @@ object StateSpec {
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (Time, KeyType, Option[ValueType], State[StateType]) =>
Option[MappedType]
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
- SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
- new StateSpecImpl(mappingFunction)
+ val cleanedMappingFunction =
+ SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
+ new StateSpecImpl(cleanedMappingFunction)
}
/**
@@ -175,10 +176,11 @@ object StateSpec {
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) =>
MappedType
): StateSpec[KeyType, ValueType, StateType, MappedType] = {
- SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
+ val cleanedMappingFunction =
+ SparkClosureCleaner.clean(mappingFunction, checkSerializable = true)
val wrappedFunction =
(time: Time, key: KeyType, value: Option[ValueType], state:
State[StateType]) => {
- Some(mappingFunction(key, value, state))
+ Some(cleanedMappingFunction(key, value, state))
}
new StateSpecImpl(wrappedFunction)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]