Repository: spark Updated Branches: refs/heads/master b5162f442 -> 5d1ec64e7
Fix #SPARK-1149 Bad partitioners can cause Spark to hang Author: liguoqiang <[email protected]> Closes #44 from witgo/SPARK-1149 and squashes the following commits: 3dcdcaf [liguoqiang] Merge branch 'master' into SPARK-1149 8425395 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149 3dad595 [liguoqiang] review comment e3e56aa [liguoqiang] Merge branch 'master' into SPARK-1149 b0d5c07 [liguoqiang] review comment d0a6005 [liguoqiang] review comment 3395ee7 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149 ac006a3 [liguoqiang] code Formatting 3feb3a8 [liguoqiang] Merge branch 'master' into SPARK-1149 adc443e [liguoqiang] partitions check bugfix 928e1e3 [liguoqiang] Added a unit test for PairRDDFunctions.lookup with bad partitioner db6ecc5 [liguoqiang] Merge branch 'master' into SPARK-1149 1e3331e [liguoqiang] Merge branch 'master' into SPARK-1149 3348619 [liguoqiang] Optimize performance for partitions check 61e5a87 [liguoqiang] Merge branch 'master' into SPARK-1149 e68210a [liguoqiang] add partition index check to submitJob 3a65903 [liguoqiang] make the code more readable 6bb725e [liguoqiang] fix #SPARK-1149 Bad partitioners can cause Spark to hang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d1ec64e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d1ec64e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d1ec64e Branch: refs/heads/master Commit: 5d1ec64e7934ad7f922cdab516fa5de690644780 Parents: b5162f4 Author: liguoqiang <[email protected]> Authored: Wed Mar 12 12:59:51 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Wed Mar 12 13:00:04 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkContext.scala | 6 ++++++ .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d1ec64e/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 745e3fa..852ed8f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -852,6 +852,9 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -955,6 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( http://git-wip-us.apache.org/repos/asf/spark/blob/5d1ec64e/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 85e8eb5..f9e994b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.lookup(5) === Seq(6,7)) assert(shuffled.lookup(-1) === Seq()) } + + test("lookup with bad partitioner") { + val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + + val p = new Partitioner { + def numPartitions: Int = 2 + + def getPartition(key: Any): Int = key.hashCode() % 2 + } + val shuffled = pairs.partitionBy(p) + + assert(shuffled.partitioner === Some(p)) + assert(shuffled.lookup(1) === Seq(2)) + intercept[IllegalArgumentException] {shuffled.lookup(-1)} + } + } /*
