[
https://issues.apache.org/jira/browse/SPARK-19243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-19243:
---------------------------------
Labels: bulk-closed (was: )
> Error when selecting from DataFrame containing parsed data from files larger
> than 1MB
> -------------------------------------------------------------------------------------
>
> Key: SPARK-19243
> URL: https://issues.apache.org/jira/browse/SPARK-19243
> Project: Spark
> Issue Type: Bug
> Reporter: Ben
> Priority: Major
> Labels: bulk-closed
>
> I hope I can describe the problem clearly. This error happens with Spark
> 2.0.1. However, I tried with Spark 2.1.0 on my test PC and it worked there,
> none of the issues below, but I can't try it on the test cluster because
> Spark needs to be upgraded there. I'm opening this ticket because if it's a
> bug, maybe something is still partially present in Spark 2.1.0.
> Initially I though it was my script's problem so I tried to debug, until I
> found why this is happening.
> Step by step, I load XML files through spark-xml into a DataFrame. In my
> case, the rowTag is the root tag, so each XML file creates a row. The XML
> structure is fairly complex, which are converted to nested columns or arrays
> inside the DF. Since I need to flatten the whole table, and since the output
> is not fixed but I dynamically select what I want as output, in case I need
> to output columns that have been parsed as arrays, then I explode them with
> explode() only when needed.
> Normally I can select various columns that don't have many entries without a
> problem.
> I select a column that has a lot of entries into a new DF, e.g. simply through
> {noformat}
> df2 = df.select(...)
> {noformat}
> and then if I try to do a count() or first() or anything, Spark behaves two
> ways:
> 1. If the source file was smaller than 1MB, it works.
> 2. If the source file was larger than 1MB, the following error occurs:
> {noformat}
> Traceback (most recent call last):
> File \"/myCode.py\", line 71, in main
> df.count()
> File
> \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\",
> line 299, in count
> return int(self._jdf.count())
> File
> \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\",
> line 1133, in __call__
> answer, self.gateway_client, self.target_id, self.name)
> File
> \"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\",
> line 63, in deco
> return f(*a, **kw)
> File
> \"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\",
> line 319, in get_return_value
> format(target_id, \".\", name), value)
> Py4JJavaError: An error occurred while calling o180.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0
> (TID 6, compname): java.lang.IllegalArgumentException: Size exceeds
> Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
> at
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
> at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
> at
> org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
> at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
> at
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> {noformat}
> It doesn't happen only when calling count(), but anything.
> As mentioned, this only happens when there are many entries (This only
> happens in my case in only one configuration, when the column with the most
> entries is selected, a big difference from the others, and these entries were
> in an array before, and have been exploded), otherwise it works fine
> independently of the file size.
> The same thing happens if the source is an archive containing the XML files.
> If the archive itself is larger than 1MB, error; smaller, works.
> The Spark log shows that the error is when calling e.g. count(), although in
> my script's log I get this error after starting the select(...) command, but
> maybe that's because of the parallel processing. Consequently, I'm not sure
> whether the error happens during the explode(), select(), or for some reason
> after the DF has been prepared and I call e.g. count().
> I have allocated more than enough memory. On another system I got a `Java
> heap space` error, I guess on the way to getting the actual error.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]