[
https://issues.apache.org/jira/browse/SPARK-33373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228559#comment-17228559
]
Andre Boechat commented on SPARK-33373:
---------------------------------------
{quote}Overall a writing operation cannot overwrite existing path if you need
to read original data{quote}
Since we just need to persist the features' mean values, I would be surprised
if that is the case. But I'm still not familiarized with Spark ML's persistence
code.
[~viirya], I really appreciate your time and attention to this issue.
> A serialized ImputerModel fails to be serialized again
> ------------------------------------------------------
>
> Key: SPARK-33373
> URL: https://issues.apache.org/jira/browse/SPARK-33373
> Project: Spark
> Issue Type: Bug
> Components: ML
> Affects Versions: 2.4.3
> Environment: * Python 3.7.3
> * (Py)Spark 2.4.3
> Reporter: Andre Boechat
> Priority: Major
>
> After loading an {{ImputerModel}} from disk, the instance fails to save
> itself again.
> h2. Code Sample
> {code:python}
> from pyspark.ml.feature import Imputer, ImputerModel
> df = sparksession.createDataFrame(
> [
> (2.0, 3.0),
> (2.0, 1.0),
> (2.0, None),
> (None, 0.0)
> ],
> ["x200", "x3"]
> ).repartition(1)
> i = Imputer(inputCols=["x200", "x3"], outputCols=["x200_i", "x3_i"]).fit(
> df
> )
> tdf = i.transform(df)
> fpath = "/tmp/bucketpath"
> i.write().overwrite().save(fpath)
> li = ImputerModel.load(fpath)
> t2df = li.transform(df)
> assert all(
> r1.asDict() == r2.asDict() for r1, r2 in zip(
> tdf.collect(), t2df.collect()
> )
> )
> # This line makes Spark crash.
> li.write().overwrite().save(fpath)
> {code}
> h2. Stacktrace
> {code:python}
> --> 480 li.write().overwrite().save(fpath)
>
>
>
>
>
>
>
>
>
> /usr/spark-2.4.3/python/pyspark/ml/util.py in save(self, path)
>
>
> 181 if not isinstance(path, basestring):
>
>
> 182 raise TypeError("path should be a basestring, got type
> %s" % type(path))
>
> --> 183 self._jwrite.save(path)
>
>
> 184
>
>
> 185 def overwrite(self):
>
>
>
>
>
>
>
> /usr/local/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self,
> *args)
>
> 1284 answer = self.gateway_client.send_command(command)
>
>
> 1285 return_value = get_return_value(
>
>
>
>
> -> 1286 answer, self.gateway_client, self.target_id, self.name)
>
>
>
>
> 1287
>
>
> 1288 for temp_arg in temp_args:
>
>
>
>
>
> /usr/spark-2.4.3/python/pyspark/sql/utils.py in deco(*a, **kw)
>
>
> 61 def deco(*a, **kw):
>
>
> 62 try:
>
>
> ---> 63 return f(*a, **kw)
>
>
> 64 except py4j.protocol.Py4JJavaError as e:
>
>
> 65 s = e.java_exception.toString()
>
>
>
>
>
> /usr/local/lib/python3.7/site-packages/py4j/protocol.py in
> get_return_value(answer, gateway_client, target_id, name)
>
> 326 raise Py4JJavaError(
>
>
> 327 "An error occurred while calling {0}{1}{2}.\n".
>
>
> --> 328 format(target_id, ".", name), value)
>
>
>
>
> 329 else:
>
>
> 330 raise Py4JError(
>
>
>
>
>
> Py4JJavaError: An error occurred while calling o572.save.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>
>
>
>
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>
>
>
>
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
>
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>
>
>
>
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>
>
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>
>
>
>
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>
>
>
>
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>
>
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>
>
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>
>
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>
>
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>
>
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>
>
> at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
>
>
> at
> org.apache.spark.ml.feature.ImputerModel$ImputerModelWriter.saveImpl(Imputer.scala:252)
>
>
> at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
>
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
>
>
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
>
> at py4j.Gateway.invoke(Gateway.java:282)
>
>
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
>
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>
>
> at java.lang.Thread.run(Thread.java:748)
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 63.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 63.0 (TID 192, localhost, executor driver):
> java.io.FileNotFoundException: File
> file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet
> does not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in
> SQL or by recreating the Dataset/DataFrame involved.
>
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
>
>
>
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>
>
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>
>
>
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>
>
>
>
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
>
>
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
>
>
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>
>
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
>
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>
> at scala.Option.foreach(Option.scala:257)
>
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
>
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
>
>
>
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>
>
>
>
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
>
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>
>
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
>
> ... 35 more
>
>
> Caused by: java.io.FileNotFoundException: File
> file:/tmp/bucketpath/data/part-00000-0e35e712-a3f9-451a-856b-15dc690569ad-c000.snappy.parquet
> does not exist
> It is possible the underlying files have been updated. You can explicitly
> invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in
> SQL or by recreating the Dataset/DataFrame involved.
>
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
> Source)
>
>
>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>
>
>
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
>
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>
>
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>
>
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
>
>
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>
> ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
