> native job submition for Python We're working on it!
> Any idea where I should look at? Does the core dump contain the stack trace directly leading to the SIGBUS? On Fri, Dec 13, 2019 at 1:12 PM Matthew K. <[email protected]> wrote: > This is not that difficult to implement, but would be better to be done > when you guys integrated native job submition for Python. > > However, I need t fix this last issue, which is the crash. Any idea where > I should look at? > > > *Sent:* Friday, December 13, 2019 at 2:52 PM > *From:* "Kyle Weaver" <[email protected]> > *To:* dev <[email protected]> > *Subject:* Re: Beam's job crashes on cluster > > I applied some modifications to the code to run Beam tasks on k8s > cluster using spark-submit. > > Interesting, how does that work? > > On Fri, Dec 13, 2019 at 12:49 PM Matthew K. <[email protected]> wrote: > >> >> I'm not sure if that could be a problem. I'm *not* running snadalone >> Spark. I applied some modifications to the code to run Beam tasks on k8s >> cluster using spark-submit. Therefore, worker nodes are spawned when >> spark-submit is called and connect to the master, and are supposed to be >> destroyed when job is finished. >> >> Therefore, the crash should have some other reason. >> >> *Sent:* Friday, December 13, 2019 at 2:37 PM >> *From:* "Kyle Weaver" <[email protected]> >> *To:* dev <[email protected]> >> *Subject:* Re: Beam's job crashes on cluster >> Correction: should be formatted `spark://host:port`. Should follow the >> rules here: >> https://spark.apache.org/docs/latest/submitting-applications.html#master-urls >> >> On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <[email protected]> wrote: >> >>> You probably will want to add argument `-PsparkMasterUrl=localhost:8080` >>> (or whatever host:port your Spark master is on) to the job-server:runShadow >>> command. >>> >>> Without specifying the master URL, the default is to start an embedded >>> Spark master within the same JVM as the job server, rather than using your >>> standalone master. >>> >>> On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <[email protected]> wrote: >>> >>>> Job server is running on master node by this: >>>> >>>> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` >>>> >>>> Spark workers (executors) run on separate nodes, sharing /tmp (1GB >>>> size) in order to be able to access Beam job's MANIFEST. I'm running Python >>>> 2.7. >>>> >>>> There is no other shared resources between them. A pure Spark job works >>>> fine on the cluster (as far as I tested a simple one). If I'm not wrong, >>>> beam job executes with no problem when all master and workers run on the >>>> same node (but separate containers). >>>> >>>> *Sent:* Friday, December 13, 2019 at 1:49 PM >>>> *From:* "Kyle Weaver" <[email protected]> >>>> *To:* [email protected] >>>> *Subject:* Re: Beam's job crashes on cluster >>>> > Do workers need to talk to job server independent from spark >>>> executors? >>>> >>>> No, they don't. >>>> >>>> From the time stamps in your logs, it looks like the sigbus happened >>>> after the executor was lost. >>>> >>>> Some additional info that might help us establish a chain of causation: >>>> - the arguments you used to start the job server? >>>> - the spark cluster deployment setup? >>>> >>>> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote: >>>> >>>>> Actually the reason for that error is Job Server/JRE crashes at final >>>>> stages and service becomes unavailable (note: job is on a very small >>>>> dataset that is the absence of cluster, will be done in a couple of >>>>> seconds): >>>>> >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 >>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 >>>>> on sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, >>>>> free: 967.8 MB) >>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 >>>>> on 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) >>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 >>>>> on 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 >>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 >>>>> <============-> 98% EXECUTING [2m 26s] >>>>> > IDLE >>>>> > IDLE >>>>> > IDLE >>>>> > :runners:spark:job-server:runShadow >>>>> # >>>>> # A fatal error has been detected by the Java Runtime Environment: >>>>> # >>>>> # SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, >>>>> tid=0x00007f5abb886700 >>>>> # >>>>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build >>>>> 1.8.0_232-b09) >>>>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 >>>>> compressed oops) >>>>> # Problematic frame: >>>>> # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e >>>>> # >>>>> # Core dump written. Default location: /opt/spark/beam/core or core.825 >>>>> # >>>>> # An error report file with more information is saved as: >>>>> # /opt/spark/beam/hs_err_pid825.log >>>>> # >>>>> # If you would like to submit a bug report, please visit: >>>>> # http://bugreport.java.com/bugreport/crash.jsp >>>>> # >>>>> Aborted (core dumped) >>>>> >>>>> >>>>> From /opt/spark/beam/hs_err_pid825.log: >>>>> >>>>> Internal exceptions (10 >>>>> events): >>>>> >>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>>> 605] >>>>> >>>>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a >>>>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder> >>>>> (0x0000000794f69e70) thrown at >>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp, >>>>> line 199] >>>>> >>>>> >>>>> Looking at the logs when running the script, I can see exectors become >>>>> lost, but not sure if that might be related to the crash of the job >>>>> server: >>>>> >>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0 >>>>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055 >>>>> bytes) >>>>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in >>>>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3 >>>>> GB) >>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0 >>>>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779 >>>>> bytes) >>>>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0 >>>>> (TID 13) in 37 ms on 192.168.118.75 (executor 1) >>>>> (1/4) >>>>> >>>>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send >>>>> map output locations for shuffle 8 to 192.168.118.75:49158 >>>>> >>>>> 19/12/13 15:07:30 INFO >>>>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling >>>>> executor >>>>> 2. >>>>> >>>>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4) >>>>> >>>>> Which result in losing shuffle files, and the following exception: >>>>> >>>>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: >>>>> 2 (epoch >>>>> 4) >>>>> >>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0 >>>>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670 >>>>> bytes) >>>>> >>>>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0 >>>>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1) >>>>> (2/4) >>>>> >>>>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in >>>>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3 >>>>> GB) >>>>> >>>>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send >>>>> map output locations for shuffle 7 to 192.168.118.75:49158 >>>>> >>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0 >>>>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670 >>>>> bytes) >>>>> >>>>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID >>>>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1, >>>>> reduceId=1, >>>>> message= >>>>> >>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an >>>>> output location for shuffle >>>>> 7 >>>>> >>>>> at >>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882) >>>>> >>>>> at >>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878) >>>>> >>>>> at >>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>>> >>>>> at >>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>>> >>>>> at >>>>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878) >>>>> >>>>> at >>>>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691) >>>>> >>>>> at >>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) >>>>> >>>>> at >>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>>> >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>>> >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>>> >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) >>>>> >>>>> at >>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>>>> >>>>> at >>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>>> >>>>> at >>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>>> >>>>> at >>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>>> >>>>> at >>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286) >>>>> >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>>> >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.Task.run(Task.scala:123) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >>>>> >>>>> at >>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>> >>>>> at >>>>> java.lang.Thread.run(Thread.java:748) >>>>> >>>>> >>>>> >>>>> *Sent:* Friday, December 13, 2019 at 6:58 AM >>>>> *From:* "Matthew K." <[email protected]> >>>>> *To:* [email protected] >>>>> *Cc:* dev <[email protected]> >>>>> >>>>> *Subject:* Re: Beam's job crashes on cluster >>>>> Hi Kyle, >>>>> >>>>> This is the pipeleine options config (I replaced localhost with actual >>>>> job server's IP address, and still receive the same error. Do workers need >>>>> to talk to job server independent from spark executors?): >>>>> >>>>> options = PipelineOptions([ >>>>> "--runner=PortableRunner", >>>>> "--job_endpoint=%s:8099" % ip_address, >>>>> "--environment_type=PROCESS", >>>>> >>>>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", >>>>> "" >>>>> ]) >>>>> >>>>> >>>>> >>>>> *Sent:* Thursday, December 12, 2019 at 5:30 PM >>>>> *From:* "Kyle Weaver" <[email protected]> >>>>> *To:* dev <[email protected]> >>>>> *Subject:* Re: Beam's job crashes on cluster >>>>> Can you share the pipeline options you are using? >>>>> Particularly environment_type and environment_config. >>>>> >>>>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote: >>>>> >>>>>> Running Beam on Spark cluster, it crashhes and I get the following >>>>>> error (workers are on separate nodes, it works fine when workers are on >>>>>> the >>>>>> same node as runner): >>>>>> >>>>>> > Task :runners:spark:job-server:runShadow FAILED >>>>>> Exception in thread wait_until_finish_read: >>>>>> Traceback (most recent call last): >>>>>> File "/usr/lib/python2.7/threading.py", line 801, in >>>>>> __bootstrap_inner >>>>>> self.run() >>>>>> File "/usr/lib/python2.7/threading.py", line 754, in run >>>>>> self.__target(*self.__args, **self.__kwargs) >>>>>> File >>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>>>> line 411, in read_messages >>>>>> for message in self._message_stream: >>>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", >>>>>> line 395, in next >>>>>> return self._next() >>>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", >>>>>> line 561, in _next >>>>>> raise self >>>>>> _Rendezvous: <_Rendezvous of RPC that terminated with: >>>>>> status = StatusCode.UNAVAILABLE >>>>>> details = "Socket closed" >>>>>> debug_error_string = >>>>>> "{"created":"@1576190515.361076583","description":"Error received from >>>>>> peer >>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>>>> closed","grpc_status":14}" >>>>>> > >>>>>> Traceback (most recent call last): >>>>>> File "/opt/spark/work-dir/beam_script.py", line 49, in <module> >>>>>> stats = >>>>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, >>>>>> pipeline_options=options) >>>>>> File >>>>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", >>>>>> line 197, in generate_statistics_from_csv >>>>>> statistics_pb2.DatasetFeatureStatisticsList))) >>>>>> File >>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line >>>>>> 427, >>>>>> in __exit__ >>>>>> self.run().wait_until_finish() >>>>>> File >>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>>>> line 429, in wait_until_finish >>>>>> for state_response in self._state_stream: >>>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", >>>>>> line 395, in next >>>>>> return self._next() >>>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", >>>>>> line 561, in _next >>>>>> raise self >>>>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: >>>>>> status = StatusCode.UNAVAILABLE >>>>>> details = "Socket closed" >>>>>> debug_error_string = >>>>>> "{"created":"@1576190515.361053677","description":"Error received from >>>>>> peer >>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>>>> closed","grpc_status":14}" >>>>>> >>>>>
