Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1551a72cb -> 1bce96db5


[SPARK-15718][SQL] better error message for writing bucketed data

## What changes were proposed in this pull request?

Currently we don't support bucketing for `save` and `insertInto`.

For `save`, we just write the data out into a directory users specified, and 
it's not a table, we don't keep its metadata. When we read it back, we have no 
idea if the data is bucketed or not, so it doesn't make sense to use `save` to 
write bucketed data, as we can't use the bucket information anyway.

We can support it in the future, once we have features like bucket discovery, 
or we save bucket information in the data directory too, so that we don't need 
to rely on a metastore.

For `insertInto`, it inserts data into an existing table, so it doesn't make 
sense to specify bucket information, as we should get the bucket information 
from the existing table.

This PR improves the error message for the above 2  cases.
## How was this patch tested?

new test in `BukctedWriteSuite`

Author: Wenchen Fan <[email protected]>

Closes #13452 from cloud-fan/error-msg.

(cherry picked from commit f34aadc54ca1a9fd4236a928d342324b26fb3a12)
Signed-off-by: Andrew Or <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bce96db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bce96db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bce96db

Branch: refs/heads/branch-2.0
Commit: 1bce96db5f366099d09d9083a21e1b34d15fae19
Parents: 1551a72
Author: Wenchen Fan <[email protected]>
Authored: Thu Jun 2 17:39:56 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Jun 2 17:40:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala   | 10 +++++-----
 .../test/DataFrameReaderWriterSuite.scala        |  4 ++--
 .../spark/sql/sources/BucketedWriteSuite.scala   | 19 +++++++++++++++----
 3 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1bce96db/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 50ae966..1dd8818 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -281,7 +281,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * @since 1.4.0
    */
   def save(): Unit = {
-    assertNotBucketed()
+    assertNotBucketed("save")
     assertNotStreaming("save() can only be called on non-continuous queries")
     val dataSource = DataSource(
       df.sparkSession,
@@ -330,7 +330,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    */
   @Experimental
   def startStream(): ContinuousQuery = {
-    assertNotBucketed()
+    assertNotBucketed("startStream")
     assertStreaming("startStream() can only be called on continuous queries")
 
     if (source == "memory") {
@@ -430,7 +430,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   private def insertInto(tableIdent: TableIdentifier): Unit = {
-    assertNotBucketed()
+    assertNotBucketed("insertInto")
     assertNotStreaming("insertInto() can only be called on non-continuous 
queries")
     val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
     val overwrite = mode == SaveMode.Overwrite
@@ -500,10 +500,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
         s"existing columns (${validColumnNames.mkString(", ")})"))
   }
 
-  private def assertNotBucketed(): Unit = {
+  private def assertNotBucketed(operation: String): Unit = {
     if (numBuckets.isDefined || sortColumnNames.isDefined) {
       throw new IllegalArgumentException(
-        "Currently we don't support writing bucketed data to this data 
source.")
+        s"'$operation' does not support bucketing right now.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1bce96db/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index a2aac69..431a943 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -456,7 +456,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .stream()
     val w = df.write
     val e = intercept[IllegalArgumentException](w.bucketBy(1, 
"text").startStream())
-    assert(e.getMessage == "Currently we don't support writing bucketed data 
to this data source.")
+    assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
   }
 
   test("check sortBy() can only be called on non-continuous queries;") {
@@ -465,7 +465,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
       .stream()
     val w = df.write
     val e = intercept[IllegalArgumentException](w.sortBy("text").startStream())
-    assert(e.getMessage == "Currently we don't support writing bucketed data 
to this data source.")
+    assert(e.getMessage == "'startStream' does not support bucketing right 
now.")
   }
 
   test("check save(path) can only be called on non-continuous queries") {

http://git-wip-us.apache.org/repos/asf/spark/blob/1bce96db/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index ff44c6f..61a281d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -59,11 +59,22 @@ class BucketedWriteSuite extends QueryTest with 
SQLTestUtils with TestHiveSingle
     intercept[SparkException](df.write.bucketBy(3, 
"i").format("text").saveAsTable("tt"))
   }
 
-  test("write bucketed data to non-hive-table or existing hive table") {
+  test("write bucketed data using save()") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
-    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").parquet("/tmp/path"))
-    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").json("/tmp/path"))
-    intercept[IllegalArgumentException](df.write.bucketBy(2, 
"i").insertInto("tt"))
+
+    val e = intercept[IllegalArgumentException] {
+      df.write.bucketBy(2, "i").parquet("/tmp/path")
+    }
+    assert(e.getMessage == "'save' does not support bucketing right now.")
+  }
+
+  test("write bucketed data using insertInto()") {
+    val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+
+    val e = intercept[IllegalArgumentException] {
+      df.write.bucketBy(2, "i").insertInto("tt")
+    }
+    assert(e.getMessage == "'insertInto' does not support bucketing right 
now.")
   }
 
   private val df = (0 until 50).map(i => (i % 5, i % 13, 
i.toString)).toDF("i", "j", "k")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to