Repository: spark Updated Branches: refs/heads/branch-1.4 fe59a4a5f -> f0e404020
[SPARK-8379] [SQL] avoid speculative tasks write to the same file The issue link [SPARK-8379](https://issues.apache.org/jira/browse/SPARK-8379) Currently,when we insert data to the dynamic partition with speculative tasks we will get the Exception ``` org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): Lease mismatch on /tmp/hive-jeanlyn/hive_2015-06-15_15-20-44_734_8801220787219172413-1/-ext-10000/ds=2015-06-15/type=2/part-00301.lzo owned by DFSClient_attempt_201506031520_0011_m_000189_0_-1513487243_53 but is accessed by DFSClient_attempt_201506031520_0011_m_000042_0_-1275047721_57 ``` This pr try to write the data to temporary dir when using dynamic parition avoid the speculative tasks writing the same file Author: jeanlyn <[email protected]> Closes #6833 from jeanlyn/speculation and squashes the following commits: 64bbfab [jeanlyn] use FileOutputFormat.getTaskOutputPath to get the path 8860af0 [jeanlyn] remove the never using code e19a3bd [jeanlyn] avoid speculative tasks write same file (cherry picked from commit a1e3649c8775d71ca78796b6544284e942ac1331) Signed-off-by: Cheng Lian <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0e40402 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0e40402 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0e40402 Branch: refs/heads/branch-1.4 Commit: f0e4040202f77d16f04468b639abc2bb0d7257ec Parents: fe59a4a Author: jeanlyn <[email protected]> Authored: Sun Jun 21 00:13:40 2015 -0700 Committer: Cheng Lian <[email protected]> Committed: Sun Jun 21 00:13:55 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 1 - .../org/apache/spark/sql/hive/hiveWriterContainers.scala | 11 +++++------ 2 files changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0e40402/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8613332..72c4448 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -198,7 +198,6 @@ case class InsertIntoHiveTable( table.hiveQlTable.getPartCols().foreach { entry => orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse("")) } - val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) // inheritTableSpecs is set to true. It should be set to false for a IMPORT query // which is currently considered as a Hive native command. http://git-wip-us.apache.org/repos/asf/spark/blob/f0e40402/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 2bb526b..e1e8e47 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -228,12 +228,11 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) - val path = { - val outputPath = FileOutputFormat.getOutputPath(conf.value) - assert(outputPath != null, "Undefined job output-path") - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) - new Path(workPath, getOutputName) - } + // use the path like ${hive_tmp}/_temporary/${attemptId}/ + // to avoid write to the same file when `spark.speculation=true` + val path = FileOutputFormat.getTaskOutputPath( + conf.value, + dynamicPartPath.stripPrefix("/") + "/" + getOutputName) HiveFileFormatUtils.getHiveRecordWriter( conf.value, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
