Repository: spark
Updated Branches:
  refs/heads/master f2f4e7afe -> 1ee472eec


[SPARK-25621][SPARK-25622][TEST] Reduce test time of 
BucketedReadWithHiveSupportSuite

## What changes were proposed in this pull request?

By replacing loops with random possible value.
- `read partitioning bucketed tables with bucket pruning filters` reduce from 
55s to 7s
- `read partitioning bucketed tables having composite filters` reduce from 54s 
to 8s
- total time: reduce from 288s to 192s

## How was this patch tested?

Unit test

Closes #22640 from gengliangwang/fastenBucketedReadSuite.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ee472ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ee472ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ee472ee

Branch: refs/heads/master
Commit: 1ee472eec15e104c4cd087179a9491dc542e15d7
Parents: f2f4e7a
Author: Gengliang Wang <[email protected]>
Authored: Sat Oct 6 14:54:04 2018 +0800
Committer: hyukjinkwon <[email protected]>
Committed: Sat Oct 6 14:54:04 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/sources/BucketedReadSuite.scala   | 181 ++++++++++---------
 1 file changed, 91 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ee472ee/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index a941420..a2bc651 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
 import java.io.File
 import java.net.URI
 
+import scala.util.Random
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions
@@ -47,11 +49,13 @@ class BucketedReadWithoutHiveSupportSuite extends 
BucketedReadSuite with SharedS
 abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
   import testImplicits._
 
-  private lazy val df = (0 until 50).map(i => (i % 5, i % 13, 
i.toString)).toDF("i", "j", "k")
+  private val maxI = 5
+  private val maxJ = 13
+  private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, 
i.toString)).toDF("i", "j", "k")
   private lazy val nullDF = (for {
     i <- 0 to 50
     s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
-  } yield (i % 5, s, i % 13)).toDF("i", "j", "k")
+  } yield (i % maxI, s, i % maxJ)).toDF("i", "j", "k")
 
   // number of buckets that doesn't yield empty buckets when bucketing on 
column j on df/nullDF
   // empty buckets before filtering might hide bugs in pruning logic
@@ -66,23 +70,22 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         .bucketBy(8, "j", "k")
         .saveAsTable("bucketed_table")
 
-      for (i <- 0 until 5) {
-        val table = spark.table("bucketed_table").filter($"i" === i)
-        val query = table.queryExecution
-        val output = query.analyzed.output
-        val rdd = query.toRdd
-
-        assert(rdd.partitions.length == 8)
-
-        val attrs = table.select("j", "k").queryExecution.analyzed.output
-        val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
-          val getBucketId = UnsafeProjection.create(
-            HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
-            output)
-          rows.map(row => getBucketId(row).getInt(0) -> index)
-        })
-        checkBucketId.collect().foreach(r => assert(r._1 == r._2))
-      }
+      val bucketValue = Random.nextInt(maxI)
+      val table = spark.table("bucketed_table").filter($"i" === bucketValue)
+      val query = table.queryExecution
+      val output = query.analyzed.output
+      val rdd = query.toRdd
+
+      assert(rdd.partitions.length == 8)
+
+      val attrs = table.select("j", "k").queryExecution.analyzed.output
+      val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
+        val getBucketId = UnsafeProjection.create(
+          HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
+          output)
+        rows.map(row => getBucketId(row).getInt(0) -> index)
+      })
+      checkBucketId.collect().foreach(r => assert(r._1 == r._2))
     }
   }
 
@@ -145,36 +148,36 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         .bucketBy(numBuckets, "j")
         .saveAsTable("bucketed_table")
 
-      for (j <- 0 until 13) {
-        // Case 1: EqualTo
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = j :: Nil,
-          filterCondition = $"j" === j,
-          df)
-
-        // Case 2: EqualNullSafe
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = j :: Nil,
-          filterCondition = $"j" <=> j,
-          df)
-
-        // Case 3: In
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = Seq(j, j + 1, j + 2, j + 3),
-          filterCondition = $"j".isin(j, j + 1, j + 2, j + 3),
-          df)
-
-        // Case 4: InSet
-        val inSetExpr = expressions.InSet($"j".expr, Set(j, j + 1, j + 2, j + 
3).map(lit(_).expr))
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = Seq(j, j + 1, j + 2, j + 3),
-          filterCondition = Column(inSetExpr),
-          df)
-      }
+      val bucketValue = Random.nextInt(maxJ)
+      // Case 1: EqualTo
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = bucketValue :: Nil,
+        filterCondition = $"j" === bucketValue,
+        df)
+
+      // Case 2: EqualNullSafe
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = bucketValue :: Nil,
+        filterCondition = $"j" <=> bucketValue,
+        df)
+
+      // Case 3: In
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, 
bucketValue + 3),
+        filterCondition = $"j".isin(bucketValue, bucketValue + 1, bucketValue 
+ 2, bucketValue + 3),
+        df)
+
+      // Case 4: InSet
+      val inSetExpr = expressions.InSet($"j".expr,
+        Set(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 
3).map(lit(_).expr))
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, 
bucketValue + 3),
+        filterCondition = Column(inSetExpr),
+        df)
     }
   }
 
@@ -188,13 +191,12 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         .bucketBy(numBuckets, "j")
         .saveAsTable("bucketed_table")
 
-      for (j <- 0 until 13) {
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = j :: Nil,
-          filterCondition = $"j" === j,
-          df)
-      }
+      val bucketValue = Random.nextInt(maxJ)
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = bucketValue :: Nil,
+        filterCondition = $"j" === bucketValue,
+        df)
     }
   }
 
@@ -236,40 +238,39 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
         .bucketBy(numBuckets, "j")
         .saveAsTable("bucketed_table")
 
-      for (j <- 0 until 13) {
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = j :: Nil,
-          filterCondition = $"j" === j && $"k" > $"j",
-          df)
-
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = j :: Nil,
-          filterCondition = $"j" === j && $"i" > j % 5,
-          df)
-
-        // check multiple bucket values OR condition
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = Seq(j, j + 1),
-          filterCondition = $"j" === j || $"j" === (j + 1),
-          df)
-
-        // check bucket value and none bucket value OR condition
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = Nil,
-          filterCondition = $"j" === j || $"i" === 0,
-          df)
-
-        // check AND condition in complex expression
-        checkPrunedAnswers(
-          bucketSpec,
-          bucketValues = Seq(j),
-          filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === j,
-          df)
-      }
+      val bucketValue = Random.nextInt(maxJ)
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = bucketValue :: Nil,
+        filterCondition = $"j" === bucketValue && $"k" > $"j",
+        df)
+
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = bucketValue :: Nil,
+        filterCondition = $"j" === bucketValue && $"i" > bucketValue % 5,
+        df)
+
+      // check multiple bucket values OR condition
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = Seq(bucketValue, bucketValue + 1),
+        filterCondition = $"j" === bucketValue || $"j" === (bucketValue + 1),
+        df)
+
+      // check bucket value and none bucket value OR condition
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = Nil,
+        filterCondition = $"j" === bucketValue || $"i" === 0,
+        df)
+
+      // check AND condition in complex expression
+      checkPrunedAnswers(
+        bucketSpec,
+        bucketValues = Seq(bucketValue),
+        filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === bucketValue,
+        df)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to