[ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880840#comment-16880840
 ] 

Hyukjin Kwon commented on SPARK-28269:
--------------------------------------

{quote}
The above code gets stacked on the ArrowStreamPandasSerializer: 
{quote}

sorry if I misunderstood but what exactly stack mean here? I think I can 
reproduce the error but want to know if this error is the same one you faced. I 
am seeing seomthing like thing:

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o426.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in 
stage 5.0 failed 1 times, most recent failure: Lost task 45.0 in stage 5.0 (TID 
110, localhost, executor driver): 
org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by 
query. Memory leaked: (1048576)
Allocator(stdin reader for python) 0/1048576/2114176/9223372036854775807 
(res/actual/peak/limit)


Previous exception in task: Unexpected end of input trying to read batch.
        
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:575)
        
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
        
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:102)
        
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:155)
        
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:130)
        
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:418)
        
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
        
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        org.apache.spark.scheduler.Task.run(Task.scala:126)
        
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:748)
        at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:140)
        at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1952)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1940)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1939)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1939)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:943)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:943)
        at scala.Option.foreach(Option.scala:274)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:943)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2169)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2118)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2107)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:745)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:438)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3409)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2511)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3399)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3395)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2511)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2742)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:264)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:301)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Memory was 
leaked by query. Memory leaked: (1048576)
Allocator(stdin reader for python) 0/1048576/2114176/9223372036854775807 
(res/actual/peak/limit)


Previous exception in task: Unexpected end of input trying to read batch.
        
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessageBody(MessageSerializer.java:575)
        
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:68)
        
org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:102)
        
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:155)
        
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.read(ArrowPythonRunner.scala:130)
        
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:418)
        
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
        
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        org.apache.spark.scheduler.Task.run(Task.scala:126)
        
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:748)
        at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:140)
        at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:426)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1350)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
{code}




> ArrowStreamPandasSerializer get stack
> -------------------------------------
>
>                 Key: SPARK-28269
>                 URL: https://issues.apache.org/jira/browse/SPARK-28269
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3
>            Reporter: Modi Tamam
>            Priority: Major
>         Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # ------------------- Start Generating the big data frame-------------------
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in 
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
>     import numpy as np
>     import pandas as pd
>     return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # ------------------- End Generating the big data frame-------------------
> # -------------------Start the bug reproduction---------------------------
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
>     import pandas as pd
>     ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
>     return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to 
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in      
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf 
> "spark.executor.memory=20G" --conf 
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to