Repository: spark
Updated Branches:
  refs/heads/branch-2.0 995f602d2 -> 4131623a8


[SPARK-18003][SPARK CORE] Fix bug of RDD zipWithIndex & zipWithUniqueId index 
value overflowing

## What changes were proposed in this pull request?

- Fix bug of RDD `zipWithIndex` generating wrong result when one partition 
contains more than 2147483647 records.

- Fix bug of RDD `zipWithUniqueId` generating wrong result when one partition 
contains more than 2147483647 records.

## How was this patch tested?

test added.

Author: WeichenXu <weichenxu...@outlook.com>

Closes #15550 from WeichenXu123/fix_rdd_zipWithIndex_overflow.

(cherry picked from commit 39755169fb5bb07332eef263b4c18ede1528812d)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4131623a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4131623a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4131623a

Branch: refs/heads/branch-2.0
Commit: 4131623a8585fe99f79d82c24ab3b8b506d0d616
Parents: 995f602
Author: WeichenXu <weichenxu...@outlook.com>
Authored: Wed Oct 19 23:41:38 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Oct 19 23:41:46 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 .../org/apache/spark/rdd/ZippedWithIndexRDD.scala    |  5 ++---
 .../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
 .../scala/org/apache/spark/util/UtilsSuite.scala     |  7 +++++++
 4 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 34d32aa..7013396 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
   def zipWithUniqueId(): RDD[(T, Long)] = withScope {
     val n = this.partitions.length.toLong
     this.mapPartitionsWithIndex { case (k, iter) =>
-      iter.zipWithIndex.map { case (item, i) =>
+      Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
         (item, i * n + k)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 32931d5..dff6737 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends 
RDD[(T, Long)](prev)
 
   override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, 
Long)] = {
     val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
-    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
-      (x._1, split.startIndex + x._2)
-    }
+    val parentIter = firstParent[T].iterator(split.prev, context)
+    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3d862f4..1686edb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1768,6 +1768,21 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Generate a zipWithIndex iterator, avoid index value overflowing problem
+   * in scala's zipWithIndex
+   */
+  def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): 
Iterator[(T, Long)] = {
+    new Iterator[(T, Long)] {
+      var index: Long = startIndex - 1L
+      def hasNext: Boolean = iterator.hasNext
+      def next(): (T, Long) = {
+        index += 1L
+        (iterator.next(), index)
+      }
+    }
+  }
+
+  /**
    * Creates a symlink.
    *
    * @param src absolute path to the source

http://git-wip-us.apache.org/repos/asf/spark/blob/4131623a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 2741ad7..b67482a 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(Utils.getIteratorSize(iterator) === 5L)
   }
 
+  test("getIteratorZipWithIndex") {
+    val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + 
Int.MaxValue)
+    assert(iterator.toArray === Array(
+      (0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
+    ))
+  }
+
   test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
     val parent: File = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to