Repository: spark Updated Branches: refs/heads/branch-2.0 d27df3579 -> d719e9a08
[SPARK-17417][CORE] Fix # of partitions for Reliable RDD checkpointing ## What changes were proposed in this pull request? Currently the no. of partition files are limited to 10000 files (%05d format). If there are more than 10000 part files, the logic goes for a toss while recreating the RDD as it sorts them by string. More details can be found in the JIRA desc [here](https://issues.apache.org/jira/browse/SPARK-17417). ## How was this patch tested? I tested this patch by checkpointing a RDD and then manually renaming part files to the old format and tried to access the RDD. It was successfully created from the old format. Also verified loading a sample parquet file and saving it as multiple formats - CSV, JSON, Text, Parquet, ORC and read them successfully back from the saved files. I couldn't launch the unit test from my local box, so will wait for the Jenkins output. Author: Dhruve Ashar <[email protected]> Closes #15370 from dhruve/bug/SPARK-17417. (cherry picked from commit 4bafacaa5f50a3e986c14a38bc8df9bae303f3a0) Signed-off-by: Tom Graves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d719e9a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d719e9a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d719e9a0 Branch: refs/heads/branch-2.0 Commit: d719e9a080a909a6a56db938750d553668743f8f Parents: d27df35 Author: Dhruve Ashar <[email protected]> Authored: Mon Oct 10 10:55:57 2016 -0500 Committer: Tom Graves <[email protected]> Committed: Mon Oct 10 10:57:30 2016 -0500 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d719e9a0/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index fddb935..b73214f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -69,10 +69,10 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( val inputFiles = fs.listStatus(cpath) .map(_.getPath) .filter(_.getName.startsWith("part-")) - .sortBy(_.toString) + .sortBy(_.getName.stripPrefix("part-").toInt) // Fail fast if input files are invalid inputFiles.zipWithIndex.foreach { case (path, i) => - if (!path.toString.endsWith(ReliableCheckpointRDD.checkpointFileName(i))) { + if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) { throw new SparkException(s"Invalid checkpoint file: $path") } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
