This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 51b85811b [#1567] fix(spark): Let Spark use its own NettyUtils (#1565)
51b85811b is described below
commit 51b85811bc4d7a9b3595bc04283c8cf6c7172d8a
Author: RickyMa <[email protected]>
AuthorDate: Mon Mar 11 09:53:25 2024 +0800
[#1567] fix(spark): Let Spark use its own NettyUtils (#1565)
### What changes were proposed in this pull request?
When we release the shaded client jar for Spark 2.x, the class
`org.apache.spark.network.util.NettyUtils.class` should not be included in the
package.
### Why are the changes needed?
Fix https://github.com/apache/incubator-uniffle/issues/1567.
&
It's also a followup PR for
https://github.com/apache/incubator-uniffle/pull/727.
When running in Spark 2.4, we will encounter exceptions as below:
```
24/03/07 16:34:37 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(tdwadmin); groups
with view permissions: Set(); users with modify permissions: Set(tdwadmin);
groups with modify permissions: Set()
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.network.util.NettyUtils.createEventLoop(Lorg/apache/spark/network/util/IOMode;ILjava/lang/String;)Lio/netty/channel/EventLoopGroup;
at
org.apache.spark.network.client.TransportClientFactory.<init>(TransportClientFactory.java:104)
at
org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:89)
at
org.apache.spark.rpc.netty.NettyRpcEnv.<init>(NettyRpcEnv.scala:70)
at
org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:449)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:264)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:271)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:474)
at
org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:96)
at
org.apache.spark.deploy.yarn.SQLApplicationMaster.<init>(SQLApplicationMaster.scala:53)
at
org.apache.spark.deploy.yarn.SQLApplicationMaster$$anonfun$main$1.apply$mcV$sp(SQLApplicationMaster.scala:544)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2286)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.yarn.SQLApplicationMaster$.main(SQLApplicationMaster.scala:543)
at
org.apache.spark.deploy.yarn.SQLApplicationMaster.main(SQLApplicationMaster.scala)
```
This is because the return value of `createEventLoop` in `NettyUtils`
within Uniffle is `org.apache.uniffle.io.netty.channel.EventLoopGroup` (which
is shaded), while the return value of `createEventLoop` in `NettyUtils` within
Spark is `io.netty.channel.EventLoopGroup`. When running a Spark application,
the Driver loads `NettyUtils` from the rss-client's JAR, causing inconsistency
in the method's return values and ultimately leading to a `NoSuchMethodError`
exception.
We should let Spark use its own `NettyUtils` instead of ours.
However, if we simply remove the `org.apache.spark.network.util.NettyUtils`
file from the code repository, we will encounter errors when running
integration tests.
```
java.lang.RuntimeException: java.lang.NoSuchFieldException:
DEFAULT_TINY_CACHE_SIZE
at
org.apache.spark.network.util.NettyUtils.getPrivateStaticField(NettyUtils.java:131)
at
org.apache.spark.network.util.NettyUtils.createPooledByteBufAllocator(NettyUtils.java:118)
at
org.apache.spark.network.server.TransportServer.init(TransportServer.java:94)
at
org.apache.spark.network.server.TransportServer.<init>(TransportServer.java:73)
at
org.apache.spark.network.TransportContext.createServer(TransportContext.java:114)
at
org.apache.spark.rpc.netty.NettyRpcEnv.startServer(NettyRpcEnv.scala:119)
at
org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:465)
at
org.apache.spark.rpc.netty.NettyRpcEnvFactory$$anonfun$4.apply(NettyRpcEnv.scala:464)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:2275)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:2267)
at
org.apache.spark.rpc.netty.NettyRpcEnvFactory.create(NettyRpcEnv.scala:469)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:57)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:249)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:256)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:423)
at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2493)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:934)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:925)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:925)
at
org.apache.uniffle.test.SparkIntegrationTestBase.runSparkApp(SparkIntegrationTestBase.java:92)
at
org.apache.uniffle.test.SparkIntegrationTestBase.run(SparkIntegrationTestBase.java:53)
at
org.apache.uniffle.test.RSSStageResubmitTest.testRSSStageResubmit(RSSStageResubmitTest.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
```
This is because our code project _**globally controls**_ the version of
Netty in the root `pom.xml`'s `dependencyManagement`, which leads to Spark2's
own lower version of Netty being upgraded to a higher version. This causes
exceptions due to Netty version incompatibility, resulting in certain fields
not being found. This issue does not occur in the production environment, as
Spark has its own `NettyUtils` and does not need to use our provided
`NettyUtils`. Retaining `org.apache.spark [...]
Of course, the optimal approach would be to shade our own Netty during
integration testing, allowing Spark to continue using its own Netty dependency,
effectively separating the two. This would provide the most accurate testing,
as any changes in Spark2's Netty version could be verified through unit tests.
However, this would mean that a large amount of integration test code would
need to prefix `org.apache.uniffle` to the `import` statements where Netty is
used. Ultimately, this coul [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
client-spark/spark2-shaded/pom.xml | 3 +++
.../src/main/java/org/apache/spark/network/util/NettyUtils.java | 5 +++++
2 files changed, 8 insertions(+)
diff --git a/client-spark/spark2-shaded/pom.xml
b/client-spark/spark2-shaded/pom.xml
index bca2aad1b..f8a0373f6 100644
--- a/client-spark/spark2-shaded/pom.xml
+++ b/client-spark/spark2-shaded/pom.xml
@@ -213,6 +213,9 @@
to="lib${rss.shade.native.packageName}_netty_transport_native_kqueue_aarch_64.jnilib"
type="glob"></mapper>
</move>
+ <!-- Delete our NettyUtils to avoid errors like
NoSuchMethodError, let Spark use its own NettyUtils -->
+ <!-- See https://github.com/apache/incubator-uniffle/pull/1565
for more details -->
+ <delete
dir="${project.build.directory}/unpacked/org/apache/spark/network"/>
<echo message="repackaging netty jar"></echo>
<jar
destfile="${project.build.directory}/${project.artifactId}-${project.version}.jar"
basedir="${project.build.directory}/unpacked"/>
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
b/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
index f64cc8e95..34c1f7ba0 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -39,6 +39,11 @@ import org.apache.uniffle.common.exception.RssException;
/**
* copy from spark, In order to override the createPooledByteBufAllocator
method, the property
* DEFAULT_TINY_CACHE_SIZE does not exist in netty>4.1.47.
+ *
+ * <p>Attention: This class is intended for use in the testing phase only and
will not be included
+ * in the final packaged artifact for production environment. For production
environment, Spark will
+ * use its own NettyUtils instead of this one. This is somewhat of a hack, but
given that Spark 2.x
+ * doesn't update frequently, it's not much of an issue to proceed this way.
*/
public class NettyUtils {