Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c1390ccbb -> f15d641e2


[SPARK-15871][SQL] Add `assertNotPartitioned` check in `DataFrameWriter`

## What changes were proposed in this pull request?

It doesn't make sense to specify partitioning parameters, when we write data 
out from Datasets/DataFrames into `jdbc` tables or streaming `ForeachWriter`s.

This patch adds `assertNotPartitioned` check in `DataFrameWriter`.

<table>
<tr>
        <td align="center"><strong>operation</strong></td>
        <td align="center"><strong>should check not partitioned?</strong></td>
</tr>
<tr>
        <td align="center">mode</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">outputMode</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">trigger</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">format</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">option/options</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">partitionBy</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">bucketBy</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">sortBy</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">save</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">queryName</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">startStream</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">foreach</td>
        <td align="center">yes</td>
</tr>
<tr>
        <td align="center">insertInto</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">saveAsTable</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">jdbc</td>
        <td align="center">yes</td>
</tr>
<tr>
        <td align="center">json</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">parquet</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">orc</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">text</td>
        <td align="center"></td>
</tr>
<tr>
        <td align="center">csv</td>
        <td align="center"></td>
</tr>
</table>

## How was this patch tested?

New dedicated tests.

Author: Liwei Lin <lwl...@gmail.com>

Closes #13597 from lw-lin/add-assertNotPartitioned.

(cherry picked from commit fb219029dd1b8d2783c3e202361401048296595c)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f15d641e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f15d641e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f15d641e

Branch: refs/heads/branch-2.0
Commit: f15d641e297d425a8c1b4ba6c93f4f98a3f70d0f
Parents: c1390cc
Author: Liwei Lin <lwl...@gmail.com>
Authored: Fri Jun 10 13:01:29 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Jun 10 13:01:37 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 12 +++++-
 .../test/DataFrameReaderWriterSuite.scala       | 42 ++++++++++++++++++--
 .../spark/sql/sources/BucketedWriteSuite.scala  |  8 ++--
 3 files changed, 52 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 6ce59e8..78b74f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -432,6 +432,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    */
   @Experimental
   def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+    assertNotPartitioned("foreach")
     assertNotBucketed("foreach")
     assertStreaming(
       "foreach() can only be called on streaming Datasets/DataFrames.")
@@ -562,8 +563,13 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 
   private def assertNotBucketed(operation: String): Unit = {
     if (numBuckets.isDefined || sortColumnNames.isDefined) {
-      throw new IllegalArgumentException(
-        s"'$operation' does not support bucketing right now.")
+      throw new AnalysisException(s"'$operation' does not support bucketing 
right now")
+    }
+  }
+
+  private def assertNotPartitioned(operation: String): Unit = {
+    if (partitioningColumns.isDefined) {
+      throw new AnalysisException( s"'$operation' does not support 
partitioning")
     }
   }
 
@@ -646,6 +652,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * @since 1.4.0
    */
   def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+    assertNotPartitioned("jdbc")
+    assertNotBucketed("jdbc")
     assertNotStreaming("jdbc() can only be called on non-continuous queries")
 
     val props = new Properties()

http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index bf6063a..6e0d66a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -455,8 +455,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
-    val e = intercept[IllegalArgumentException](w.bucketBy(1, 
"text").startStream())
-    assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
+    val e = intercept[AnalysisException](w.bucketBy(1, "text").startStream())
+    assert(e.getMessage == "'startStream' does not support bucketing right 
now;")
   }
 
   test("check sortBy() can only be called on non-continuous queries;") {
@@ -464,8 +464,8 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .format("org.apache.spark.sql.streaming.test")
       .stream()
     val w = df.write
-    val e = intercept[IllegalArgumentException](w.sortBy("text").startStream())
-    assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
+    val e = intercept[AnalysisException](w.sortBy("text").startStream())
+    assert(e.getMessage == "'startStream' does not support bucketing right 
now;")
   }
 
   test("check save(path) can only be called on non-continuous queries") {
@@ -558,6 +558,40 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     assert(e.getMessage == "csv() can only be called on non-continuous 
queries;")
   }
 
+  test("check foreach() does not support partitioning or bucketing") {
+    val df = spark.read
+      .format("org.apache.spark.sql.streaming.test")
+      .stream()
+
+    var w = df.write.partitionBy("value")
+    var e = intercept[AnalysisException](w.foreach(null))
+    Seq("foreach", "partitioning").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+
+    w = df.write.bucketBy(2, "value")
+    e = intercept[AnalysisException](w.foreach(null))
+    Seq("foreach", "bucketing").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
+  test("check jdbc() does not support partitioning or bucketing") {
+    val df = spark.read.text(newTextInput)
+
+    var w = df.write.partitionBy("value")
+    var e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "partitioning").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+
+    w = df.write.bucketBy(2, "value")
+    e = intercept[AnalysisException](w.jdbc(null, null, null))
+    Seq("jdbc", "bucketing").foreach { s =>
+      assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
+    }
+  }
+
   test("ConsoleSink can be correctly loaded") {
     LastOptions.clear()
     val df = spark.read

http://git-wip-us.apache.org/repos/asf/spark/blob/f15d641e/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 61a281d..9974451 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -62,19 +62,19 @@ class BucketedWriteSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
   test("write bucketed data using save()") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
 
-    val e = intercept[IllegalArgumentException] {
+    val e = intercept[AnalysisException] {
       df.write.bucketBy(2, "i").parquet("/tmp/path")
     }
-    assert(e.getMessage == "'save' does not support bucketing right now.")
+    assert(e.getMessage == "'save' does not support bucketing right now;")
   }
 
   test("write bucketed data using insertInto()") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
 
-    val e = intercept[IllegalArgumentException] {
+    val e = intercept[AnalysisException] {
       df.write.bucketBy(2, "i").insertInto("tt")
     }
-    assert(e.getMessage == "'insertInto' does not support bucketing right 
now.")
+    assert(e.getMessage == "'insertInto' does not support bucketing right 
now;")
   }
 
   private val df = (0 until 50).map(i => (i % 5, i % 13, 
i.toString)).toDF("i", "j", "k")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to