Repository: spark Updated Branches: refs/heads/master 9308bf119 -> ef7a5e0bc
[SPARK-14603][SQL][FOLLOWUP] Verification of Metadata Operations by Session Catalog #### What changes were proposed in this pull request? This follow-up PR is to address the remaining comments in https://github.com/apache/spark/pull/12385 The major change in this PR is to issue better error messages in PySpark by using the mechanism that was proposed by davies in https://github.com/apache/spark/pull/7135 For example, in PySpark, if we input the following statement: ```python >>> l = [('Alice', 1)] >>> df = sqlContext.createDataFrame(l) >>> df.createTempView("people") >>> df.createTempView("people") ``` Before this PR, the exception we will get is like ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.createTempView. : org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary table 'people' already exists; at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTempView(SessionCatalog.scala:324) at org.apache.spark.sql.SparkSession.createTempView(SparkSession.scala:523) at org.apache.spark.sql.Dataset.createTempView(Dataset.scala:2328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) ``` After this PR, the exception we will get become cleaner: ``` Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/dataframe.py", line 152, in createTempView self._jdf.createTempView(name) File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/Users/xiaoli/IdeaProjects/sparkDelivery/python/pyspark/sql/utils.py", line 75, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"Temporary table 'people' already exists;" ``` #### How was this patch tested? Fixed an existing PySpark test case Author: gatorsmile <gatorsm...@gmail.com> Closes #13126 from gatorsmile/followup-14684. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef7a5e0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef7a5e0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef7a5e0b Branch: refs/heads/master Commit: ef7a5e0bcaee45b907a10b73f11c838ef6e23614 Parents: 9308bf1 Author: gatorsmile <gatorsm...@gmail.com> Authored: Thu May 19 11:46:11 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu May 19 11:46:11 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 3 +-- python/pyspark/sql/utils.py | 10 +++++++++- .../spark/sql/catalyst/catalog/InMemoryCatalog.scala | 4 ++-- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- 4 files changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a0264ce..a68ef33 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -144,8 +144,7 @@ class DataFrame(object): >>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... - Py4JJavaError: ... - : org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException... + AnalysisException: u"Temporary table 'people' already exists;" >>> spark.catalog.dropTempView("people") """ http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/python/pyspark/sql/utils.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 36c9322..8c8768f 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -51,6 +51,12 @@ class ContinuousQueryException(CapturedException): """ +class QueryExecutionException(CapturedException): + """ + Failed to execute a query. + """ + + def capture_sql_exception(f): def deco(*a, **kw): try: @@ -61,12 +67,14 @@ def capture_sql_exception(f): e.java_exception.getStackTrace())) if s.startswith('org.apache.spark.sql.AnalysisException: '): raise AnalysisException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.catalyst.analysis.NoSuchTableException: '): + if s.startswith('org.apache.spark.sql.catalyst.analysis'): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.ContinuousQueryException: '): raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) + if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): + raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) if s.startswith('java.lang.IllegalArgumentException: '): raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) raise http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 21da55c..489a1c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -87,7 +87,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E db: String, table: String, specs: Seq[TablePartitionSpec]): Unit = { - specs foreach { s => + specs.foreach { s => if (!partitionExists(db, table, s)) { throw new NoSuchPartitionException(db = db, table = table, spec = s) } @@ -98,7 +98,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E db: String, table: String, specs: Seq[TablePartitionSpec]): Unit = { - specs foreach { s => + specs.foreach { s => if (partitionExists(db, table, s)) { throw new PartitionAlreadyExistsException(db = db, table = table, spec = s) } http://git-wip-us.apache.org/repos/asf/spark/blob/ef7a5e0b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 54b30d3..cf9286e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -111,13 +111,13 @@ class SessionCatalog( fs.makeQualified(hadoopPath) } - protected[this] def requireDbExists(db: String): Unit = { + private def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { throw new NoSuchDatabaseException(db) } } - protected[this] def requireTableExists(name: TableIdentifier): Unit = { + private def requireTableExists(name: TableIdentifier): Unit = { if (!tableExists(name)) { val db = name.database.getOrElse(currentDb) throw new NoSuchTableException(db = db, table = name.table) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org