Hi I would go for a regular mysql bulkload. I m saying writing an output that mysql is able to load in one process. I d'say spark jdbc is ok for small fetch/load. When comes large RDBMS call, it turns out using the regular optimized API is better than jdbc
2018-05-16 16:18 GMT+02:00 Vadim Semenov <va...@datadoghq.com>: > Upon downsizing to 20 partitions some of your partitions become too big, > and I see that you're doing caching, and executors try to write big > partitions to disk, but fail because they exceed 2GiB > > > Caused by: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply > (DiskStore.scala:125) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply > (DiskStore.scala:124) > > You can try to coalesce to 100 and reduce the number of executors to keep > the load on MySQL reasonable > > On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla < > davide.brambi...@contentwise.tv> wrote: > >> Hi all, >> we have a dataframe with 1000 partitions and we need to write the >> dataframe into a MySQL using this command: >> >> df.coalesce(20) >> df.write.jdbc(url=url, >> table=table, >> mode=mode, >> properties=properties) >> >> and we get this errors randomly >> >> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) >> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply >> (DiskStore.scala:125) >> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply >> (DiskStore.scala:124) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126) >> at org.apache.spark.storage.BlockManager.getLocalValues(BlockMa >> nager.scala:520) >> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693) >> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM >> anager.scala:753) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:96) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:53) >> at org.apache.spark.scheduler.Task.run(Task.scala:108) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1149) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch >> eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ >> 1.apply(DAGScheduler.scala:1678) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$ >> 1.apply(DAGScheduler.scala:1677) >> at scala.collection.mutable.ResizableArray$class.foreach(Resiza >> bleArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) >> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu >> ler.scala:1677) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS >> etFailed$1.apply(DAGScheduler.scala:855) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS >> etFailed$1.apply(DAGScheduler.scala:855) >> at scala.Option.foreach(Option.scala:257) >> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( >> DAGScheduler.scala:855) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOn >> Receive(DAGScheduler.scala:1905) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe >> ceive(DAGScheduler.scala:1860) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe >> ceive(DAGScheduler.scala:1849) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) >> at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446) >> at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at py4j.Gateway.invoke(Gateway.java:280) >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:214) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.IllegalArgumentException: Size exceeds >> Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) >> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply >> (DiskStore.scala:125) >> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply >> (DiskStore.scala:124) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126) >> at org.apache.spark.storage.BlockManager.getLocalValues(BlockMa >> nager.scala:520) >> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693) >> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockM >> anager.scala:753) >> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR >> DD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:96) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap >> Task.scala:53) >> at org.apache.spark.scheduler.Task.run(Task.scala:108) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1149) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:624) >> ... 1 more >> >> According to JDBC connector *numPartitions* determines the maximum write >> parallelism, we use coalesce instead to set partitions to 20 to avoid >> generating too much load on MySQL and it seems that the issue is due to the >> fact that Spark is repartitioning from 1000 to 20 before writing. >> >> If the issue is due to this configuration which is the best practice to >> write a dataframe with lots of partitions into our database limiting the >> concurrent connections? >> >> Thanks >> >> >> Davide B. >> ------------------------------------------------------------ >> ---------------------------- >> Davide Brambilla >> ContentWise R&D >> ContentWise >> davide.brambi...@contentwise.tv >> phone: +39 02 49517001 mobile: 345 71 13 800 >> ContentWise SrL - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY >> <https://maps.google.com/?q=Via+Schiaffino,+11+-+20158+Milano+(MI)+%E2%80%93+ITALY&entry=gmail&source=g> >> > > > > -- > Sent from my iPhone >