[ 
https://issues.apache.org/jira/browse/SPARK-22910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-22910.
----------------------------------
    Resolution: Incomplete

> Wrong results in Spark Job because failed to move to Trash
> ----------------------------------------------------------
>
>                 Key: SPARK-22910
>                 URL: https://issues.apache.org/jira/browse/SPARK-22910
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Ohad Raviv
>            Priority: Major
>              Labels: bulk-closed
>
> Our Spark job has completed with status successful although the data save was 
> corrupted.
> What happened is that we have a monthly job. each run overwrites the output 
> of the previous run. we happened to change the sql.shuffle.partitions number 
> between the runs from 2000 to 1000, and what happened was that the new run 
> had Warn failure of moving the old data to the user's .Trash because it was 
> full. because it was only a warning the process continued and overwritten the 
> new 1000 files - while leaving most of the old remaining 1000 files in their 
> place. this resulted that in the final output we had a folder with mix of old 
> and new data and that caused corruption in the process.
> the post mortem is relatively easy to understand.
> {code}
> hadoop fs -ls /the/folder
> -rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20 
> /the/folder/part-00000.gz 
> .
> .
> -rwxr-xr-x 3 spark_user spark_user 111134899 2017-11-17 06:39 
> /the/folder/part-01990.gz 
> {code}
> and in the driver's log:
> {code}
> 17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed: 
> java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
> java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
>       at 
> org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160)
>       at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109)
>       at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90)
>       at 
> org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272)
>       at 
> org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603)
>       at 
> org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586)
>       at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851)
>       at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272)
>       at 
> org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671)
>       at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741)
>       at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
>       at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
>       at 
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
>       at 
> org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
>       at 
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>       at 
> org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:92)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
>       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
>       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
> Caused by: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The 
> DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 3 
> TB but diskspace consumed = 3298551987606 B = 3.00 TB
>       at 
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>       at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1855)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:575)
>       at 
> org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:154)
>       ... 70 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.DSQuotaExceededException):
>  The DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B 
> = 3 TB but diskspace consumed = 3298551987606 B = 3.00 TB
>       at 
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
>       at 
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
>       at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
>       at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
>       at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
>       at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
>       at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
>       at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1469)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1400)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>       at com.sun.proxy.$Proxy9.rename(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:468)
>       at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy10.rename(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1853)
> {code}
> it looks like the problem is with Spark's Hive version here:
> https://github.com/apache/hive/blob/spark2/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
> where the code actually says:
> {code}
>     //swallow the exception
>     LOG.warn("Directory " + oldPath.toString() + " canot be removed:" + 
> StringUtils.stringifyException(e));
> {code}
> and that is also the same as Hive release-1.2.1, However, it seems like Hive 
> solved it in Hive 2.0.0:
> https://issues.apache.org/jira/browse/HIVE-12505
> with this patch:
> https://github.com/apache/hive/commit/f9791bebaa9d71ff62cecedb0243d09e99abfb38
> So I guess I'm trying to add urgency to: SPARK-20202



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to