Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fcc9bd632 -> 42c1fdd22


[SPARK-25234][SPARKR] avoid integer overflow in parallelize

## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It 
might cause integer overflow.

## How was this patch tested?

unit test

Closes #22225 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng <m...@databricks.com>
Signed-off-by: Xiangrui Meng <m...@databricks.com>
(cherry picked from commit 9714fa547325ed7b6a8066a88957537936b233dd)
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42c1fdd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42c1fdd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42c1fdd2

Branch: refs/heads/branch-2.3
Commit: 42c1fdd229b3cf19ff804b7516eae9d36ae50c81
Parents: fcc9bd6
Author: Xiangrui Meng <m...@databricks.com>
Authored: Fri Aug 24 15:03:00 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Fri Aug 24 15:04:11 2018 -0700

----------------------------------------------------------------------
 R/pkg/R/context.R                    | 9 ++++-----
 R/pkg/tests/fulltests/test_context.R | 7 +++++++
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 443c2ff..25e2d15 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -138,11 +138,10 @@ parallelize <- function(sc, coll, numSlices = 1) {
 
   sizeLimit <- getMaxAllocationLimit(sc)
   objectSize <- object.size(coll)
+  len <- length(coll)
 
   # For large objects we make sure the size of each slice is also smaller than 
sizeLimit
-  numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
-  if (numSerializedSlices > length(coll))
-    numSerializedSlices <- length(coll)
+  numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / 
sizeLimit)))
 
   # Generate the slice ids to put each row
   # For instance, for numSerializedSlices of 22, length of 50
@@ -153,8 +152,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
   splits <- if (numSerializedSlices > 0) {
     unlist(lapply(0: (numSerializedSlices - 1), function(x) {
       # nolint start
-      start <- trunc((x * length(coll)) / numSerializedSlices)
-      end <- trunc(((x + 1) * length(coll)) / numSerializedSlices)
+      start <- trunc((as.numeric(x) * len) / numSerializedSlices)
+      end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
       # nolint end
       rep(start, end - start)
     }))

http://git-wip-us.apache.org/repos/asf/spark/blob/42c1fdd2/R/pkg/tests/fulltests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_context.R 
b/R/pkg/tests/fulltests/test_context.R
index f0d0a51..288a271 100644
--- a/R/pkg/tests/fulltests/test_context.R
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -240,3 +240,10 @@ test_that("add and get file to be downloaded with Spark 
job on every node", {
   unlink(path, recursive = TRUE)
   sparkR.session.stop()
 })
+
+test_that("SPARK-25234: parallelize should not have integer overflow", {
+  sc <- sparkR.sparkContext(master = sparkRTestMaster)
+  # 47000 * 47000 exceeds integer range
+  parallelize(sc, 1:47000, 47000)
+  sparkR.session.stop()
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to