Repository: spark
Updated Branches:
  refs/heads/master 9308bf119 -> ef7a5e0bc


[SPARK-14603][SQL][FOLLOWUP] Verification of Metadata Operations by Session 
Catalog

#### What changes were proposed in this pull request?
This follow-up PR is to address the remaining comments in 
https://github.com/apache/spark/pull/12385

The major change in this PR is to issue better error messages in PySpark by 
using the mechanism that was proposed by davies in 
https://github.com/apache/spark/pull/7135

For example, in PySpark, if we input the following statement:
```python
>>> l = [('Alice', 1)]
>>> df = sqlContext.createDataFrame(l)
>>> df.createTempView("people")
>>> df.createTempView("people")
```
Before this PR, the exception we will get is like
```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", 
line 152, in createTempView
    self._jdf.createTempView(name)
  File 
"/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
 line 933, in __call__
  File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", 
line 63, in deco
    return f(*a, **kw)
  File 
"/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
 line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.createTempView.
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: 
Temporary table 'people' already exists;
    at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:324)
    at org.apache.spark.sql.SparkSession.createTempView(SparkSession.scala:523)
    at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2328)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)
```
After this PR, the exception we will get become cleaner:
```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", 
line 152, in createTempView
    self._jdf.createTempView(name)
  File 
"/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
 line 933, in __call__
  File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", 
line 75, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Temporary table 'people' already exists;"
```

#### How was this patch tested?
Fixed an existing PySpark test case

Author: gatorsmile <gatorsm...@gmail.com>

Closes #13126 from gatorsmile/followup-14684.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef7a5e0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef7a5e0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef7a5e0b

Branch: refs/heads/master
Commit: ef7a5e0bcaee45b907a10b73f11c838ef6e23614
Parents: 9308bf1
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Thu May 19 11:46:11 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 19 11:46:11 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                           |  3 +--
 python/pyspark/sql/utils.py                               | 10 +++++++++-
 .../spark/sql/catalyst/catalog/InMemoryCatalog.scala      |  4 ++--
 .../spark/sql/catalyst/catalog/SessionCatalog.scala       |  4 ++--
 4 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a0264ce..a68ef33 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -144,8 +144,7 @@ class DataFrame(object):
         >>> df.createTempView("people")  # doctest: +IGNORE_EXCEPTION_DETAIL
         Traceback (most recent call last):
         ...
-        Py4JJavaError: ...
-        : 
org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException...
+        AnalysisException: u"Temporary table 'people' already exists;"
         >>> spark.catalog.dropTempView("people")
 
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/python/pyspark/sql/utils.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 36c9322..8c8768f 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -51,6 +51,12 @@ class ContinuousQueryException(CapturedException):
     """
 
 
+class QueryExecutionException(CapturedException):
+    """
+    Failed to execute a query.
+    """
+
+
 def capture_sql_exception(f):
     def deco(*a, **kw):
         try:
@@ -61,12 +67,14 @@ def capture_sql_exception(f):
                                              e.java_exception.getStackTrace()))
             if s.startswith('org.apache.spark.sql.AnalysisException: '):
                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
-            if 
s.startswith('org.apache.spark.sql.catalyst.analysis.NoSuchTableException: '):
+            if s.startswith('org.apache.spark.sql.catalyst.analysis'):
                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
             if 
s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
                 raise ParseException(s.split(': ', 1)[1], stackTrace)
             if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
                 raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
+            if 
s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
+                raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
             if s.startswith('java.lang.IllegalArgumentException: '):
                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
             raise

http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 21da55c..489a1c8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -87,7 +87,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       db: String,
       table: String,
       specs: Seq[TablePartitionSpec]): Unit = {
-    specs foreach { s =>
+    specs.foreach { s =>
       if (!partitionExists(db, table, s)) {
         throw new NoSuchPartitionException(db = db, table = table, spec = s)
       }
@@ -98,7 +98,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       db: String,
       table: String,
       specs: Seq[TablePartitionSpec]): Unit = {
-    specs foreach { s =>
+    specs.foreach { s =>
       if (partitionExists(db, table, s)) {
         throw new PartitionAlreadyExistsException(db = db, table = table, spec 
= s)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 54b30d3..cf9286e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -111,13 +111,13 @@ class SessionCatalog(
     fs.makeQualified(hadoopPath)
   }
 
-  protected[this] def requireDbExists(db: String): Unit = {
+  private def requireDbExists(db: String): Unit = {
     if (!databaseExists(db)) {
       throw new NoSuchDatabaseException(db)
     }
   }
 
-  protected[this] def requireTableExists(name: TableIdentifier): Unit = {
+  private def requireTableExists(name: TableIdentifier): Unit = {
     if (!tableExists(name)) {
       val db = name.database.getOrElse(currentDb)
       throw new NoSuchTableException(db = db, table = name.table)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to