This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ad2fb0ec5f1 [SPARK-40660][SQL][3.3] Switch to XORShiftRandom to 
distribute elements
ad2fb0ec5f1 is described below

commit ad2fb0ec5f1054a2734de818d30be4606c624fbd
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Thu Oct 6 13:02:22 2022 +0800

    [SPARK-40660][SQL][3.3] Switch to XORShiftRandom to distribute elements
    
    ### What changes were proposed in this pull request?
    
    Cherry-picked from #38106 and reverted changes in RDD.scala:
    
https://github.com/apache/spark/blob/d2952b671a3579759ad9ce326ed8389f5270fd9f/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L507
    
    ### Why are the changes needed?
    
    The number of output files has changed since SPARK-40407. [Some downstream 
projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579)
 use repartition to determine the number of output files in the test.
    ```
    bin/spark-shell --master "local[2]"
    
spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
    ```
    Before this PR and after SPARK-40407, the number of output files is 8. 
After this PR or before  SPARK-40407, the number of output files is 10.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #38110 from wangyum/branch-3.3-SPARK-40660.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
    (cherry picked from commit 5fe895a65a4a9d65f81d43af473b5e3a855ed8c8)
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../spark/sql/execution/exchange/ShuffleExchangeExec.scala |  5 ++---
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++++++-
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala    |  4 ++--
 3 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index bc8416c5b2e..f11ae0ed207 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql.execution.exchange
 
-import java.util.Random
 import java.util.function.Supplier
 
 import scala.concurrent.Future
-import scala.util.hashing
 
 import org.apache.spark._
 import org.apache.spark.internal.config
@@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, 
RecordComparator}
+import org.apache.spark.util.random.XORShiftRandom
 
 /**
  * Common trait for all shuffle exchange implementations to facilitate pattern 
matching.
@@ -314,7 +313,7 @@ object ShuffleExchangeExec {
         // end up being almost the same regardless of the index. substantially 
scrambling the
         // seed by hashing will help. Refer to SPARK-21782 for more details.
         val partitionId = TaskContext.get().partitionId()
-        var position = new 
Random(hashing.byteswap32(partitionId)).nextInt(numPartitions)
+        var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
         (row: InternalRow) => {
           // The HashPartitioner will handle the `mod` by the number of 
partitions
           position += 1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 617314eff4e..6cd4fe54f42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.sql.{Date, Timestamp}
 
+import org.apache.hadoop.fs.{Path, PathFilter}
 import org.scalatest.Assertions._
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.TableDrivenPropertyChecks._
@@ -2064,7 +2065,18 @@ class DatasetSuite extends QueryTest
   test("SPARK-40407: repartition should not result in severe data skew") {
     val df = spark.range(0, 100, 1, 50).repartition(4)
     val result = df.mapPartitions(iter => 
Iterator.single(iter.length)).collect()
-    assert(result.sorted.toSeq === Seq(19, 25, 25, 31))
+    assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
+  }
+
+  test("SPARK-40660: Switch to XORShiftRandom to distribute elements") {
+    withTempDir { dir =>
+      
spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath)
+      val fs = new 
Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf())
+      val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new 
PathFilter {
+        override def accept(path: Path): Boolean = 
path.getName.endsWith("parquet")
+      })
+      assert(parquetFiles.size === 10)
+    }
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index f711aac0253..027dc71ec2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1895,8 +1895,8 @@ class AdaptiveQueryExecSuite
         withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
           // partition size [0,258,72,72,72]
           checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
-          // partition size [144,72,144,216,144]
-          checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6)
+          // partition size [144,72,144,72,72,144,72]
+          checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7)
         }
 
         // no skewed partition should be optimized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to