[
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880840#comment-16880840
]
Hyukjin Kwon commented on SPARK-28269:
--------------------------------------
{quote}
The above code gets stacked on the ArrowStreamPandasSerializer:
{quote}
sorry if I misunderstood but what exactly stack mean here? I think I can
reproduce the error but want to know if this error is the same one you faced. I
am seeing seomthing like thing:
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o426.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in
stage 5.0 failed 1 times, most recent failure: Lost task 45.0 in stage 5.0 (TID
110, localhost, executor driver):
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by
query. Memory leaked: (1048576)
Allocator(stdin reader for python) 0/1048576/2114176/9223372036854775807
(res/actual/peak/limit)
Previous exception in task: Unexpected end of input trying to read batch.
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:575)
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:102)
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:155)
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:130)
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:418)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:126)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:140)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1952)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1940)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1939)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1939)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:943)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:943)
at scala.Option.foreach(Option.scala:274)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:943)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2169)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2118)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2107)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:745)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:438)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3409)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2511)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3399)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3395)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2511)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2742)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:264)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Memory was
leaked by query. Memory leaked: (1048576)
Allocator(stdin reader for python) 0/1048576/2114176/9223372036854775807
(res/actual/peak/limit)
Previous exception in task: Unexpected end of input trying to read batch.
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:575)
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:102)
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:155)
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:130)
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:418)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:126)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:140)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
{code}
> ArrowStreamPandasSerializer get stack
> -------------------------------------
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.4.3
> Reporter: Modi Tamam
> Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
> * ~15M rows
> * ~130 columns
> * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # ------------------- Start Generating the big data frame-------------------
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)),
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # ------------------- End Generating the big data frame-------------------
> # -------------------Start the bug reproduction---------------------------
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>
> The above code gets stacked on the ArrowStreamPandasSerializer: (on the first
> line when reading batch from the reader)
>
> {code:java}
> for batch in reader:
> yield [self.arrow_to_pandas(c) for c in
> pa.Table.from_batches([batch]).itercolumns()]{code}
>
> You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf
> "spark.executor.memory=20G" --conf
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf
> "spark.driver.memory=10G"{code}
>
> Versions:
> * pandas - 0.24.2
> * pyarrow - 0.13.0
> * Spark - 2.4.2
> * Python - 2.7.16
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]