Repository: spark
Updated Branches:
  refs/heads/master df2c6d59d -> 94f14b52a


[SPARK-16556][SPARK-16559][SQL] Fix Two Bugs in Bucket Specification

### What changes were proposed in this pull request?

**Issue 1: Silent Ignorance of Bucket Specification When Creating Table Using 
Schema Inference**

When creating a data source table without explicit specification of schema or 
SELECT clause, we silently ignore the bucket specification (CLUSTERED BY... 
SORTED BY...) in [the 
code](https://github.com/apache/spark/blob/ce3b98bae28af72299722f56e4e4ef831f471ec0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L339-L354).

For example,
```SQL
CREATE TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path '${tempDir.getCanonicalPath}'
)
CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
```

This PR captures it and issues an error message.

**Issue 2: Got a run-time `java.lang.ArithmeticException` when num of buckets 
is set to zero.**

For example,
```SQL
CREATE TABLE t USING PARQUET
OPTIONS (PATH '${path.toString}')
CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
AS SELECT 1 AS a, 2 AS b
```
The exception we got is
```
ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 
(TID 2)
java.lang.ArithmeticException: / by zero
```

This PR captures the misuse and issues an appropriate error message.

### How was this patch tested?
Added a test case in DDLSuite

Author: gatorsmile <gatorsm...@gmail.com>

Closes #14210 from gatorsmile/createTableWithoutSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94f14b52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94f14b52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94f14b52

Branch: refs/heads/master
Commit: 94f14b52a6a99047c0e30015d435bddb7f2b95fe
Parents: df2c6d5
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Fri Jul 22 13:27:17 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Jul 22 13:27:17 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/SparkSqlParser.scala    |  5 ++++
 .../sql/execution/datasources/bucket.scala      |  8 ++++++-
 .../spark/sql/execution/command/DDLSuite.scala  | 24 ++++++++++++++++++++
 .../sql/sources/CreateTableAsSelectSuite.scala  | 21 +++++++++++++++--
 4 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94f14b52/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fa4ccf4..1316d90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -344,6 +344,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
         table, provider, partitionColumnNames, bucketSpec, mode, options, 
query)
     } else {
       val struct = Option(ctx.colTypeList()).map(createStructType)
+      if (struct.isEmpty && bucketSpec.nonEmpty) {
+        throw new ParseException(
+          "Expected explicit specification of table schema when using 
CLUSTERED BY clause.", ctx)
+      }
+
       CreateTableUsing(
         table,
         struct,

http://git-wip-us.apache.org/repos/asf/spark/blob/94f14b52/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
index 6008d73..961d035 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.spark.sql.AnalysisException
+
 /**
  * A container for bucketing information.
  * Bucketing is a technology for decomposing data sets into more manageable 
parts, and the number
@@ -29,7 +31,11 @@ package org.apache.spark.sql.execution.datasources
 private[sql] case class BucketSpec(
     numBuckets: Int,
     bucketColumnNames: Seq[String],
-    sortColumnNames: Seq[String])
+    sortColumnNames: Seq[String]) {
+  if (numBuckets <= 0) {
+    throw new AnalysisException(s"Expected positive number of buckets, but got 
`$numBuckets`.")
+  }
+}
 
 private[sql] object BucketingUtils {
   // The file name of bucketed data should have 3 parts:

http://git-wip-us.apache.org/repos/asf/spark/blob/94f14b52/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 169250d..28f625b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,6 +30,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.internal.SQLConf
@@ -1264,6 +1265,29 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     }
   }
 
+  test("create table using CLUSTERED BY without schema specification") {
+    import testImplicits._
+    withTempPath { tempDir =>
+      withTable("jsonTable") {
+        (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
+
+        val e = intercept[ParseException] {
+        sql(
+          s"""
+             |CREATE TABLE jsonTable
+             |USING org.apache.spark.sql.json
+             |OPTIONS (
+             |  path '${tempDir.getCanonicalPath}'
+             |)
+             |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) 
INTO 2 BUCKETS
+           """.stripMargin)
+        }.getMessage
+        assert(e.contains(
+          "Expected explicit specification of table schema when using 
CLUSTERED BY clause"))
+      }
+    }
+  }
+
   test("create table with datasource properties (not allowed)") {
     assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES 
('spark.sql.sources.me'='anything')")
     assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +

http://git-wip-us.apache.org/repos/asf/spark/blob/94f14b52/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 251a256..5ab585f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -204,7 +205,7 @@ class CreateTableAsSelectSuite
     }
   }
 
-  test("create table using as select - with bucket") {
+  test("create table using as select - with non-zero buckets") {
     val catalog = spark.sessionState.catalog
     withTable("t") {
       sql(
@@ -217,7 +218,23 @@ class CreateTableAsSelectSuite
       )
       val table = catalog.getTableMetadata(TableIdentifier("t"))
       assert(DDLUtils.getBucketSpecFromTableProperties(table) ==
-        Some(BucketSpec(5, Seq("a"), Seq("b"))))
+        Option(BucketSpec(5, Seq("a"), Seq("b"))))
+    }
+  }
+
+  test("create table using as select - with zero buckets") {
+    withTable("t") {
+      val e = intercept[AnalysisException] {
+        sql(
+          s"""
+             |CREATE TABLE t USING PARQUET
+             |OPTIONS (PATH '${path.toString}')
+             |CLUSTERED BY (a) SORTED BY (b) INTO 0 BUCKETS
+             |AS SELECT 1 AS a, 2 AS b
+           """.stripMargin
+        )
+      }.getMessage
+      assert(e.contains("Expected positive number of buckets, but got `0`"))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to