This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49581b35a07 [SPARK-43198][CONNECT] Fix "Could not initialise class
ammonite..." error when using filter
49581b35a07 is described below
commit 49581b35a07758e17af185aa465abc055f912404
Author: vicennial <[email protected]>
AuthorDate: Wed Apr 26 12:12:27 2023 -0400
[SPARK-43198][CONNECT] Fix "Could not initialise class ammonite..." error
when using filter
### What changes were proposed in this pull request?
This PR makes the ammonite REPL use the `CodeClassWrapper` mode for
classfile generation (make ammonite generate classes instead of objects) and
changes the UDF serialization from lazy to eager.
### Why are the changes needed?
The changes have the following impact:
- `CodeClassWrapper` change
- Fixes the `io.grpc.StatusRuntimeException: UNKNOWN:
ammonite/repl/ReplBridge$` error when trying to use the `filter` method (see
[jira](https://issues.apache.org/jira/browse/SPARK-43198) for reproduction)
- Lazy to eager UDF serialization
- With class-based generation, UDFs defined using `def` would hit CNFE
because the `ScalarUserDefinedFuntion` class gets captured during serialisation
and sent over to the server (the class is a client-only class).
### Does this PR introduce _any_ user-facing change?
Yes. There are two significant changes:
- Filter works as expected when using "in-place" lambda expressions such as
in `spark.range(10).filter(n => n % 2 == 0).collectAsList()`
- UDFs defined using a lambda expression which is stored in a `val` fail
due to deserialisation issues on the server.
- Root cause is currently unknown but a ticket has been
[filed](https://issues.apache.org/jira/browse/SPARK-43227) to address the issue.
- Example: see
[this](https://github.com/apache/spark/compare/master...vicennial:spark:SPARK-43198?expand=1#diff-8d8a214eff5d2c8d523b59f2a39758ddfa84912ef7d4e0276f54e979a58f88e0R120-R129)
test.
- Currently, it is a compromise to get `filter` working as expected since
that bug is a higher-impact due to it impacting the "general" way of using the
method.
### How was this patch tested?
New unit test.
Closes #40894 from vicennial/SPARK-43198.
Authored-by: vicennial <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../apache/spark/sql/application/ConnectRepl.scala | 10 ++++++----
.../sql/expressions/UserDefinedFunction.scala | 3 ++-
.../spark/sql/application/ReplE2ESuite.scala | 23 +++++++++++++++++++++-
3 files changed, 30 insertions(+), 6 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
index 53a31fed489..d119fd60230 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
@@ -16,13 +16,12 @@
*/
package org.apache.spark.sql.application
+import ammonite.compiler.CodeClassWrapper
+import ammonite.util.Bind
import java.io.{InputStream, OutputStream}
import java.util.concurrent.Semaphore
-
import scala.util.control.NonFatal
-import ammonite.util.Bind
-
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.client.{SparkConnectClient,
SparkConnectClientParser}
@@ -88,10 +87,13 @@ object ConnectRepl {
|
|spark.registerClassFinder(new AmmoniteClassFinder(repl.sess))
|""".stripMargin
-
+ // Please note that we make ammonite generate classes instead of objects.
+ // Classes tend to have superior serialization behavior when using UDFs.
val main = ammonite.Main(
welcomeBanner = Option(splash),
predefCode = predefCode,
+ replCodeWrapper = CodeClassWrapper,
+ scriptCodeWrapper = CodeClassWrapper,
inputStream = inputStream,
outputStream = outputStream,
errorStream = errorStream)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
index ad1aae73876..bfcd4572e03 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
@@ -101,7 +101,8 @@ case class ScalarUserDefinedFunction(
override val deterministic: Boolean)
extends UserDefinedFunction {
- private[this] lazy val udf = {
+ // SPARK-43198: Eagerly serialize to prevent the UDF from containing a
reference to this class.
+ private[this] val udf = {
val udfPacketBytes = Utils.serialize(UdfPacket(function, inputEncoders,
outputEncoder))
val scalaUdfBuilder = proto.ScalarScalaUDF
.newBuilder()
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index af920f8c314..61959234c87 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -119,7 +119,10 @@ class ReplE2ESuite extends RemoteSparkSession with
BeforeAndAfterEach {
assertContains("Array[Int] = Array(19, 24, 29, 34, 39)", output)
}
- test("UDF containing lambda expression") {
+ // SPARK-43198: Switching REPL to CodeClass generation mode causes UDFs
defined through lambda
+ // expressions to hit deserialization issues.
+ // TODO(SPARK-43227): Enable test after fixing deserialization issue.
+ ignore("UDF containing lambda expression") {
val input = """
|class A(x: Int) { def get = x * 20 + 5 }
|val dummyUdf = (x: Int) => new A(x).get
@@ -130,4 +133,22 @@ class ReplE2ESuite extends RemoteSparkSession with
BeforeAndAfterEach {
assertContains("Array[Int] = Array(5, 25, 45, 65, 85)", output)
}
+ test("UDF containing in-place lambda") {
+ val input = """
+ |class A(x: Int) { def get = x * 42 + 5 }
+ |val myUdf = udf((x: Int) => new A(x).get)
+ |spark.range(5).select(myUdf(col("id"))).as[Int].collect()
+ """.stripMargin
+ val output = runCommandsInShell(input)
+ assertContains("Array[Int] = Array(5, 47, 89, 131, 173)", output)
+ }
+
+ test("SPARK-43198: Filter does not throw ammonite-related class
initialization exception") {
+ val input = """
+ |spark.range(10).filter(n => n % 2 == 0).collect()
+ """.stripMargin
+ val output = runCommandsInShell(input)
+ assertContains("Array[java.lang.Long] = Array(0L, 2L, 4L, 6L, 8L)", output)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]