[
https://issues.apache.org/jira/browse/SPARK-22910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-22910.
----------------------------------
Resolution: Incomplete
> Wrong results in Spark Job because failed to move to Trash
> ----------------------------------------------------------
>
> Key: SPARK-22910
> URL: https://issues.apache.org/jira/browse/SPARK-22910
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0, 2.2.0
> Reporter: Ohad Raviv
> Priority: Major
> Labels: bulk-closed
>
> Our Spark job has completed with status successful although the data save was
> corrupted.
> What happened is that we have a monthly job. each run overwrites the output
> of the previous run. we happened to change the sql.shuffle.partitions number
> between the runs from 2000 to 1000, and what happened was that the new run
> had Warn failure of moving the old data to the user's .Trash because it was
> full. because it was only a warning the process continued and overwritten the
> new 1000 files - while leaving most of the old remaining 1000 files in their
> place. this resulted that in the final output we had a folder with mix of old
> and new data and that caused corruption in the process.
> the post mortem is relatively easy to understand.
> {code}
> hadoop fs -ls /the/folder
> -rwxr-xr-x 3 spark_user spark_user 209012005 2017-12-10 14:20
> /the/folder/part-00000.gz
> .
> .
> -rwxr-xr-x 3 spark_user spark_user 111134899 2017-11-17 06:39
> /the/folder/part-01990.gz
> {code}
> and in the driver's log:
> {code}
> 17/12/10 15:10:00 WARN Hive: Directory hdfs:///the/folder cannot be removed:
> java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
> java.io.IOException: Failed to move to trash: hdfs:///the/folder/part-00000.gz
> at
> org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:160)
> at org.apache.hadoop.fs.Trash.moveToTrash(Trash.java:109)
> at org.apache.hadoop.fs.Trash.moveToAppropriateTrash(Trash.java:90)
> at
> org.apache.hadoop.hive.shims.Hadoop23Shims.moveToAppropriateTrash(Hadoop23Shims.java:272)
> at
> org.apache.hadoop.hive.common.FileUtils.moveToTrash(FileUtils.java:603)
> at
> org.apache.hadoop.hive.common.FileUtils.trashFilesUnderDir(FileUtils.java:586)
> at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2851)
> at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1640)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:716)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:672)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:672)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:230)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:229)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:272)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:671)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:741)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:739)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:95)
> at
> org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:739)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:323)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:170)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:347)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
> at
> org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:92)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
> Caused by: org.apache.hadoop.hdfs.protocol.DSQuotaExceededException: The
> DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B = 3
> TB but diskspace consumed = 3298551987606 B = 3.00 TB
> at
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
> at
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1855)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.rename(DistributedFileSystem.java:575)
> at
> org.apache.hadoop.fs.TrashPolicyDefault.moveToTrash(TrashPolicyDefault.java:154)
> ... 70 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.DSQuotaExceededException):
> The DiskSpace quota of /user/spark_user is exceeded: quota = 3298534883328 B
> = 3 TB but diskspace consumed = 3298551987606 B = 3.00 TB
> at
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyStoragespaceQuota(DirectoryWithQuotaFeature.java:211)
> at
> org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature.verifyQuota(DirectoryWithQuotaFeature.java:239)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.verifyQuota(FSDirectory.java:945)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.verifyQuotaForRename(FSDirRenameOp.java:102)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:190)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:474)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:73)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3768)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:986)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:583)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
> at org.apache.hadoop.ipc.Client.call(Client.java:1469)
> at org.apache.hadoop.ipc.Client.call(Client.java:1400)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy9.rename(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.rename(ClientNamenodeProtocolTranslatorPB.java:468)
> at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.rename(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1853)
> {code}
> it looks like the problem is with Spark's Hive version here:
> https://github.com/apache/hive/blob/spark2/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
> where the code actually says:
> {code}
> //swallow the exception
> LOG.warn("Directory " + oldPath.toString() + " canot be removed:" +
> StringUtils.stringifyException(e));
> {code}
> and that is also the same as Hive release-1.2.1, However, it seems like Hive
> solved it in Hive 2.0.0:
> https://issues.apache.org/jira/browse/HIVE-12505
> with this patch:
> https://github.com/apache/hive/commit/f9791bebaa9d71ff62cecedb0243d09e99abfb38
> So I guess I'm trying to add urgency to: SPARK-20202
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]