aymanfarhat commented on issue #21092:
URL: https://github.com/apache/beam/issues/21092#issuecomment-1236388087

   I can confirm that I'm facing the same issue using a job services based on 
**apache/beam_spark3_job_server:2.41.0** trying to run a beam pipeline on a 
[bitnami/spark:3.1](https://github.com/bitnami/containers/blob/main/bitnami/spark/3.1/debian-11/Dockerfile)
 based spark cluster.
   
   stderror log from the task:
   
   ```
   java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; 
local class incompatible: stream classdesc serialVersionUID = 
3456489343829468865, local class serialVersionUID = 1028182004549731694
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2005)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246)
        at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246)
        at 
org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90)
        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:195)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   Based on my understanding, this issue is usually linked to having different 
versions of spark between the version deployed on the cluster and the client 
submitting the job into the cluster. In this case, I'm guessing the client is 
the job service. 
   
   I can also confirm, after running a quick test, downloading the same spark 
sdk (as the cluster version) locally and submitting a job from a local client 
to the cluster via something like:
   
   ```
   ./bin/spark-submit \ 
        --class org.apache.spark.examples.SparkPi 
        --master spark://localhost:7077  \
        -- ./examples/jars/spark-examples_2.12-3.1.3.jar 10000
   ```
    Works fine. 
   
   I don't know what goes on inside the beam spark job server but this leads me 
to thinking, that this is probably an incompatibility between what the spark 
client version is on the beam job server and what the target cluster is running.
   
   Any idea if there is an easy work around, around this? Is there any way to 
know the exact version of Spark that the `apache/beam_spark_job_server:2.31.0` 
would support? I could adapt the Spark cluster version to that if needed.
   
   Would greatly appreciate any feedback on this. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to