Repository: spark
Updated Branches:
  refs/heads/master c7f95df5c -> 238ae51b6


[SPARK-11914][SQL] Support coalesce and repartition in Dataset APIs

This PR is to provide two common `coalesce` and `repartition` in Dataset APIs.

After reading the comments of SPARK-9999, I am unclear about the plan for 
supporting re-partitioning in Dataset APIs. Currently, both RDD APIs and 
Dataframe APIs provide users such a flexibility to control the number of 
partitions.

In most traditional RDBMS, they expose the number of partitions, the 
partitioning columns, the table partitioning methods to DBAs for performance 
tuning and storage planning. Normally, these parameters could largely affect 
the query performance. Since the actual performance depends on the workload 
types, I think it is almost impossible to automate the discovery of the best 
partitioning strategy for all the scenarios.

I am wondering if Dataset APIs are planning to hide these APIs from users? Feel 
free to reject my PR if it does not match the plan.

Thank you for your answers. marmbrus rxin cloud-fan

Author: gatorsmile <[email protected]>

Closes #9899 from gatorsmile/coalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/238ae51b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/238ae51b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/238ae51b

Branch: refs/heads/master
Commit: 238ae51b66ac12d15fba6aff061804004c5ca6cb
Parents: c7f95df
Author: gatorsmile <[email protected]>
Authored: Tue Nov 24 15:54:10 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Tue Nov 24 15:54:10 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala     | 19 +++++++++++++++++++
 .../org/apache/spark/sql/DatasetSuite.scala      | 15 +++++++++++++++
 2 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/238ae51b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0764750..17e2611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -152,6 +152,25 @@ class Dataset[T] private[sql](
    */
   def count(): Long = toDF().count()
 
+  /**
+    * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+    * @since 1.6.0
+    */
+  def repartition(numPartitions: Int): Dataset[T] = withPlan {
+    Repartition(numPartitions, shuffle = true, _)
+  }
+
+  /**
+    * Returns a new [[Dataset]] that has exactly `numPartitions` partitions.
+    * Similar to coalesce defined on an [[RDD]], this operation results in a 
narrow dependency, e.g.
+    * if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
+    * the 100 new partitions will claim 10 of the current partitions.
+    * @since 1.6.0
+    */
+  def coalesce(numPartitions: Int): Dataset[T] = withPlan {
+    Repartition(numPartitions, shuffle = false, _)
+  }
+
   /* *********************** *
    *  Functional Operations  *
    * *********************** */

http://git-wip-us.apache.org/repos/asf/spark/blob/238ae51b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 13eede1..c253fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -52,6 +52,21 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     assert(ds.takeAsList(1).get(0) == item)
   }
 
+  test("coalesce, repartition") {
+    val data = (1 to 100).map(i => ClassData(i.toString, i))
+    val ds = data.toDS()
+
+    assert(ds.repartition(10).rdd.partitions.length == 10)
+    checkAnswer(
+      ds.repartition(10),
+      data: _*)
+
+    assert(ds.coalesce(1).rdd.partitions.length == 1)
+    checkAnswer(
+      ds.coalesce(1),
+      data: _*)
+  }
+
   test("as tuple") {
     val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to