[ 
https://issues.apache.org/jira/browse/SPARK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228559#comment-17228559
 ] 

Andre Boechat commented on SPARK-33373:
---------------------------------------

{quote}Overall a writing operation cannot overwrite existing path if you need 
to read original data{quote}

Since we just need to persist the features' mean values, I would be surprised 
if that is the case. But I'm still not familiarized with Spark ML's persistence 
code.

[~viirya], I really appreciate your time and attention to this issue.

> A serialized ImputerModel fails to be serialized again
> ------------------------------------------------------
>
>                 Key: SPARK-33373
>                 URL: https://issues.apache.org/jira/browse/SPARK-33373
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.4.3
>         Environment: * Python 3.7.3
>  * (Py)Spark 2.4.3
>            Reporter: Andre Boechat
>            Priority: Major
>
> After loading an {{ImputerModel}} from disk, the instance fails to save 
> itself again.
> h2. Code Sample
> {code:python}
>     from pyspark.ml.feature import Imputer, ImputerModel
>     df = sparksession.createDataFrame(
>         [
>             (2.0, 3.0),
>             (2.0, 1.0),
>             (2.0, None),
>             (None, 0.0)
>         ],
>         ["x200", "x3"]
>     ).repartition(1)
>     i = Imputer(inputCols=["x200", "x3"], outputCols=["x200_i", "x3_i"]).fit(
>         df
>     )
>     tdf = i.transform(df)
>     fpath = "/tmp/bucketpath"
>     i.write().overwrite().save(fpath)
>     li = ImputerModel.load(fpath)
>     t2df = li.transform(df)
>     assert all(
>         r1.asDict() == r2.asDict() for r1, r2 in zip(
>             tdf.collect(), t2df.collect()
>         )
>     )
>     # This line makes Spark crash.
>     li.write().overwrite().save(fpath)
> {code}
> h2. Stacktrace
> {code:python}
> --> 480     li.write().overwrite().save(fpath)                                
>                                                                               
>                                                                               
>                                                                               
>                                                    
>                                                                               
>                                                                               
>                                                                               
>                                                                               
>                                                    
> /usr/spark-2.4.3/python/pyspark/ml/util.py in save(self, path)                
>                                                                               
>                          
>     181         if not isinstance(path, basestring):                          
>                                                                               
>                          
>     182             raise TypeError("path should be a basestring, got type 
> %s" % type(path))                                                             
>                             
> --> 183         self._jwrite.save(path)                                       
>                                                                               
>                          
>     184                                                                       
>                                                                               
>                          
>     185     def overwrite(self):                                              
>                                                                               
>                                                                               
>                                                                               
>                                                    
>                                                                               
>                                                                               
>                          
> /usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, 
> *args)                                                                        
>                          
>    1284         answer = self.gateway_client.send_command(command)            
>                                                                               
>                          
>    1285         return_value = get_return_value(                              
>                                                                               
>                                                                               
>                                                                               
>                                                    
> -> 1286             answer, self.gateway_client, self.target_id, self.name)   
>                                                                               
>                                                                               
>                                                                               
>                                                    
>    1287                                                                       
>                                                                               
>                          
>    1288         for temp_arg in temp_args:                                    
>                                                                               
>                          
>                                                                               
>                                                                               
>                          
> /usr/spark-2.4.3/python/pyspark/sql/utils.py in deco(*a, **kw)                
>                                                                               
>                          
>      61     def deco(*a, **kw):                                               
>                                                                               
>                          
>      62         try:                                                          
>                                                                               
>                          
> ---> 63             return f(*a, **kw)                                        
>                                                                               
>                          
>      64         except py4j.protocol.Py4JJavaError as e:                      
>                                                                               
>                          
>      65             s = e.java_exception.toString()                           
>                                                                               
>                          
>                                                                               
>                                                                               
>                          
> /usr/local/lib/python3.7/site-packages/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)                     
>                                             
>     326                 raise Py4JJavaError(                                  
>                                                                               
>                          
>     327                     "An error occurred while calling {0}{1}{2}.\n".   
>                                                                               
>                          
> --> 328                     format(target_id, ".", name), value)              
>                                                                               
>                                                                               
>                                                                               
>                                                    
>     329             else:                                                     
>                                                                               
>                          
>     330                 raise Py4JError(                                      
>                                                                               
>                          
>                                                                               
>                                                                               
>                          
> Py4JJavaError: An error occurred while calling o572.save.
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>                                                                             
>         at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>                                              
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>                                                                               
>                                                                               
>                                                                               
>          
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>                                                                               
>                                                                               
>                                                                               
>                     
>         at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>                                                                               
>  
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>                                                                               
>       
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>                                                                               
>       
>         at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>                                                                               
>  
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>                                                                               
>               
>         at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)    
>                                                                               
>                                                                               
>                                                                               
>                                         
>         at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)         
>                                                                               
>               
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>                                                                               
>      
>         at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)  
>                                                                               
>               
>         at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>                                                                               
>                                                                               
>                                                                               
>                            
>         at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>                                                                               
>                                                                               
>                                                                               
>                            
>         at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>                                                                     
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>                                                                               
>   
>         at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>                                                                               
>       
>         at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)    
>                                                                               
>               
>         at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>                                                                               
>               
>         at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)          
>                                                                               
>               
>         at 
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)          
>                                                                               
>               
>         at 
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)       
>                                                                               
>               
>         at 
> org.apache.spark.ml.feature.ImputerModel$ImputerModelWriter.saveImpl(Imputer.scala:252)
>                                                                               
>      
>         at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)        
>                                                                               
>                          
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        
>                                                                               
>                          
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>                                                                               
>               
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>                                                                               
>        
>         at java.lang.reflect.Method.invoke(Method.java:498)                   
>                                                                               
>                                                                               
>                                                                               
>                                                    
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)       
>                                                                               
>                          
>         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
>                                                                               
>                          
>         at py4j.Gateway.invoke(Gateway.java:282)                              
>                                                                               
>                          
>         at 
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)          
>                                                                               
>               
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)             
>                                                                               
>                          
>         at py4j.GatewayConnection.run(GatewayConnection.java:238)             
>                                                                               
>                          
>         at java.lang.Thread.run(Thread.java:748)                              
>                                                                               
>                          
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in 
> stage 63.0 (TID 192, localhost, executor driver): 
> java.io.FileNotFoundException: File 
> file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet
>  does not exist                
> It is possible the underlying files have been updated. You can explicitly 
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
> SQL or by recreating the Dataset/DataFrame involved.                          
>                                                                               
>                                                        
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>        
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
>                                                                         
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>                                                                              
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
>  Source)                                              
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)                                                    
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>                                                                               
>                                                                               
>                                                                               
>                                
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>                                                          
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)     
>                                                                               
>                          
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>                                                                    
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)    
>                                                                               
>                                                                               
>                                                                               
>                                         
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)    
>                                                                               
>                                                                               
>                                                                               
>                                         
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)                
>                                                                               
>                          
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>                                                                               
>          
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  
>                                                                               
>                          
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)         
>                                                                               
>               
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                                                                               
>              
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                                                                               
>              
>         at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:                                                            
>                                                                               
>                          
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
>                                      
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>                                                                               
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
>                                                                               
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>                                                                               
>               
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
>                                                                               
>                                                                               
>                                                                               
>                                                    
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)   
>                                                                               
>               
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>                                                                       
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>                                                                       
>         at scala.Option.foreach(Option.scala:257)                             
>                                                                               
>                          
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>                                                                               
>          
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
>                                                                               
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
>                                                                               
>   
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
>                                                                               
>                                                                               
>                                                                               
>                             
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)    
>                                                                               
>                                                                               
>                                                                               
>                                                    
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)        
>                                                                               
>               
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)      
>                                                                               
>                          
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
>                                                                             
>         ... 35 more                                                           
>                                                                               
>                          
> Caused by: java.io.FileNotFoundException: File 
> file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet
>  does not exist                          
> It is possible the underlying files have been updated. You can explicitly 
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
> SQL or by recreating the Dataset/DataFrame involved.                          
>                                                                               
>                                                        
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>        
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
>                                                                         
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>                                                                              
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
>  Source)                                                                      
>                                                                               
>                                                                               
>   
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)                                                                      
>                                                                               
>                                                                               
>         
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>                                                                               
>      
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>                                                          
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)     
>                                                                               
>                          
>         at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>                                                                    
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)    
>                                                                               
>               
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)    
>                                                                               
>               
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)                
>                                                                               
>                          
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>                                                                               
>          
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  
>                                                                               
>                          
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)         
>                                                                               
>               
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                                                                               
>              
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                                                                               
>              
>         ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to